Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filtering of listen() #2016

Open
benjie opened this issue Apr 10, 2024 · 2 comments
Open

Filtering of listen() #2016

benjie opened this issue Apr 10, 2024 · 2 comments

Comments

@benjie
Copy link
Member

benjie commented Apr 10, 2024

From discussion on Discord:

The main challenge I think will be coming up with the API that's suitable for everyone. But to start with, I'd copy the whole of the listen step into your own codebase, give it a new name (filteredListen or similar), then make the following changes:

  1. Accept a filter function in the constructor:

constructor(
pubsubOrPlan:
| ExecutableStep<GrafastSubscriber<TTopics> | null>
| GrafastSubscriber<TTopics>
| null,
topicOrPlan: ExecutableStep<TTopic> | string,
public itemPlan: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep = (
$item,
) => $item as any,
) {

3. Apply this filter function to the stream coming from pubsub.subscribe(topic):

return pubsub.subscribe(topic);

pubsub.subscribe(topic) returns (potentially the promise to) an async iterable:

subscribe<TTopic extends keyof TTopics = keyof TTopics>(
topic: TTopic,
): PromiseOrDirect<AsyncIterableIterator<TTopics[TTopic]>>;

So you need to apply filtering to that async iterable by building a new async iterable that skips over values you don't want.

@1mehal
Copy link

1mehal commented Apr 10, 2024

@benjie I would give it a try

@1mehal
Copy link

1mehal commented Apr 11, 2024

@benjie I got it to work for SimpleSubscription plugin with following code:

`
/**
 * Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
 * topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
 * callback.
 */
export class FilteredListenStep<
    TTopics extends { [topic: string]: any },
    TTopic extends keyof TTopics,
    TPayloadStep extends ExecutableStep
  >
  extends ExecutableStep<TTopics[TTopic]>
  implements StreamableStep<TTopics[TTopic]>
{
  static $$export = {
    moduleName: "grafast",
    exportName: "ListenStep",
  };
  isSyncAndSafe = true;

  /**
   * The id for the PostgreSQL context plan.
   */
  private pubsubDep: number;

  /**
   * The plan that will tell us which topic we're subscribing to.
   */
  private topicDep: number;

  private eventTypeDep: number;

  private eventTypePlan?: ExecutableStep<string | null>;

  // Add a new private field for the filter function
  private filterFunc:
    | ((item: TTopics[TTopic], eventType: string | null) => boolean)
    | null;

  constructor(
    pubsubOrPlan:
      | ExecutableStep<GrafastSubscriber<TTopics> | null>
      | GrafastSubscriber<TTopics>
      | null,
    topicOrPlan: ExecutableStep<TTopic> | string,
    public itemPlan: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep = (
      $item
    ) => $item as any,
    eventTypePlan?: ExecutableStep<string | null>,
    filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
  ) {
    super();
    const $topic =
      typeof topicOrPlan === "string" ? constant(topicOrPlan) : topicOrPlan;
    const $pubsub = isExecutableStep(pubsubOrPlan)
      ? pubsubOrPlan
      : constant(pubsubOrPlan, false);
    const $eventType = eventTypePlan || constant(null);
    this.pubsubDep = this.addDependency($pubsub);
    this.topicDep = this.addDependency($topic);
    this.eventTypeDep = this.addDependency($eventType);
    this.eventTypePlan = eventTypePlan;
    this.filterFunc = filterFunc || null;
  }

  execute(): never {
    throw new Error("ListenStep cannot be executed, it can only be streamed");
  }

  stream(
    count: number,
    values: readonly [
      GrafastValuesList<GrafastSubscriber<TTopics>>,
      GrafastValuesList<TTopic>,
      GrafastValuesList<string | null>
    ]
  ): GrafastResultStreamList<TTopics[TTopic]> {
    const pubsubs = values[this.pubsubDep as 0];
    const topics = values[this.topicDep as 1];
    const eventTypes = values[this.eventTypeDep as 2];
    const result = [];
    for (let i = 0; i < count; i++) {
      const pubsub = pubsubs[i];
      if (!pubsub) {
        throw new SafeError(
          "Subscription not supported",
          isDev
            ? {
                hint: `${
                  this.dependencies[this.pubsubDep]
                } did not provide a GrafastSubscriber; perhaps you forgot to add the relevant property to context?`,
              }
            : {}
        );
      }
      const topic = topics[i];
      const eventType = eventTypes[i]; // Actual value of eventType
      result[i] = async function* () {
        const subscription = pubsub.subscribe(topic);
        const asyncIterable =
          subscription instanceof Promise ? await subscription : subscription;
        for await (const item of asyncIterable) {
          if (!this.filterFunc || this.filterFunc(item, eventType)) {
            yield item;
          }
        }
      }.bind(this)();
    }
    return result;
  }
}

/**
 * Subscribes to the given `pubsubOrPlan` to get realtime updates on a given
 * topic (`topicOrPlan`), mapping the resulting event via the `itemPlan`
 * callback.
 */
export function filteredListen<
  TTopics extends { [topic: string]: any },
  TTopic extends keyof TTopics,
  TPayloadStep extends ExecutableStep
>(
  pubsubOrPlan:
    | ExecutableStep<GrafastSubscriber<TTopics> | null>
    | GrafastSubscriber<TTopics>
    | null,
  topicOrPlan: ExecutableStep<TTopic> | string,
  itemPlan?: (itemPlan: __ItemStep<TTopics[TTopic]>) => TPayloadStep,
  eventTypePlan?: ExecutableStep<string | null>, // Add this
  filterFunc?: (item: TTopics[TTopic], eventType: string | null) => boolean
): FilteredListenStep<TTopics, TTopic, TPayloadStep> {
  return new FilteredListenStep<TTopics, TTopic, TPayloadStep>(
    pubsubOrPlan,
    topicOrPlan,
    itemPlan,
    eventTypePlan,
    filterFunc
  );
}

const SimpleSubscriptionsPlugin = makeExtendSchemaPlugin((build) => {
  const nodeIdHandlerByTypeName = build.getNodeIdHandlerByTypeName?.();
  return {
    typeDefs: [
      gql`
        extend type Subscription {
          listen(topic: String!, eventType: String): ListenPayload
        }
        type ListenPayload {
          event: String
        }
      `,
      ...// Only add the relatedNode if supported
      (nodeIdHandlerByTypeName
        ? [
            gql`
              extend type ListenPayload {
                relatedNode: Node
                relatedNodeId: ID
              }
            `,
          ]
        : []),
    ],
    plans: {
      Subscription: {
        listen: {
          subscribePlan: EXPORTABLE(
            (context, jsonParse, lambda, listen) =>
              function subscribePlan(_$root, { $topic, $eventType = null }) {
                const $pgSubscriber = context().get("pgSubscriber");
                const $derivedTopic = lambda($topic, (topic) => {
                  return `postgraphile:${topic}`;
                });
                const eventTypePlan = lambda(
                  $eventType,
                  (eventTypeValue) => eventTypeValue
                );
                const filterFunction = (item: any, eventTypeValue: any) => {
                  const parsedItem = JSON.parse(item);
                  return !eventTypeValue || parsedItem.event === eventTypeValue;
                };

                return filteredListen(
                  $pgSubscriber,
                  $derivedTopic,
                  jsonParse,
                  eventTypePlan,
                  filterFunction
                );
              },
            [context, jsonParse, lambda, filteredListen]
          ),
          plan: EXPORTABLE(
            () =>
              function plan($event) {
                return $event;
              },
            []
          ),
        },
      },
      ListenPayload: {
        event($event) {
          return $event.get("event");
        },
        ...(nodeIdHandlerByTypeName
          ? {
              relatedNodeId($event) {
                return nodeIdFromEvent($event);
              },
              relatedNode($event) {
                const $nodeId = nodeIdFromEvent($event);
                return node(nodeIdHandlerByTypeName, $nodeId);
              },
            }
          : null),
      },
    },
  };
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: 🐭 Shrew
Development

No branches or pull requests

2 participants