Skip to content

Commit

Permalink
implement repetitions in js and python (#724)
Browse files Browse the repository at this point in the history
  • Loading branch information
samnoyes committed May 23, 2024
2 parents 505e779 + 6d0f5bf commit 69d4dde
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 17 deletions.
4 changes: 2 additions & 2 deletions js/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "langsmith",
"version": "0.1.28",
"version": "0.1.29",
"description": "Client library to connect to the LangSmith LLM Tracing and Evaluation Platform.",
"packageManager": "[email protected]",
"files": [
Expand Down Expand Up @@ -248,4 +248,4 @@
},
"./package.json": "./package.json"
}
}
}
22 changes: 21 additions & 1 deletion js/src/evaluation/_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ interface _ExperimentManagerArgs {
unknown
>;
examples?: Example[];
numRepetitions?: number;
_runsArray?: Run[];
}

Expand Down Expand Up @@ -110,6 +111,12 @@ export interface EvaluateOptions {
* @default undefined
*/
client?: Client;
/**
* The number of repetitions to perform. Each example
* will be run this many times.
* @default 1
*/
numRepetitions?: number;
}

export function evaluate(
Expand Down Expand Up @@ -149,6 +156,8 @@ class _ExperimentManager {

_examples?: Example[];

_numRepetitions?: number;

_runsArray?: Run[];

client: Client;
Expand Down Expand Up @@ -183,7 +192,15 @@ class _ExperimentManager {
for await (const example of unresolvedData) {
exs.push(example);
}
this.setExamples(exs);
if (this._numRepetitions && this._numRepetitions > 0) {
const repeatedExamples = [];
for (let i = 0; i < this._numRepetitions; i++) {
repeatedExamples.push(...exs);
}
this.setExamples(repeatedExamples);
} else {
this.setExamples(exs);
}
}
return this._examples;
}
Expand Down Expand Up @@ -264,6 +281,7 @@ class _ExperimentManager {

this._evaluationResults = args.evaluationResults;
this._summaryResults = args.summaryResults;
this._numRepetitions = args.numRepetitions;
}

_getExperiment(): TracerSession {
Expand Down Expand Up @@ -339,6 +357,7 @@ class _ExperimentManager {
const firstExample = examples[0];
const project = await this._getProject(firstExample);
await this._printExperimentStart();
this._metadata["num_repetitions"] = this._numRepetitions;
return new _ExperimentManager({
examples,
experiment: project,
Expand Down Expand Up @@ -803,6 +822,7 @@ async function _evaluate(
metadata: fields.metadata,
experiment: experiment_ ?? fields.experimentPrefix,
runs: newRuns ?? undefined,
numRepetitions: fields.numRepetitions ?? 1,
}).start();

if (_isCallable(target)) {
Expand Down
2 changes: 1 addition & 1 deletion js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ export type {
export { RunTree, type RunTreeConfig } from "./run_trees.js";

// Update using yarn bump-version
export const __version__ = "0.1.28";
export const __version__ = "0.1.29";
29 changes: 29 additions & 0 deletions js/src/tests/evaluate.int.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,35 @@ test("evaluate can evaluate", async () => {
expect(secondRunResults.results).toHaveLength(0);
});

test("evaluate can repeat", async () => {
const targetFunc = (input: Record<string, any>) => {
console.log("__input__", input);
return {
foo: input.input + 1,
};
};

const evalRes = await evaluate(targetFunc, {
data: TESTING_DATASET_NAME,
description: "Experiment from evaluate can evaluate integration test",
numRepetitions: 3,
});
expect(evalRes.results).toHaveLength(6);

for (let i = 0; i < 6; i++) {
expect(evalRes.results[i].run).toBeDefined();
expect(evalRes.results[i].example).toBeDefined();
expect(evalRes.results[i].evaluationResults).toBeDefined();
const currRun = evalRes.results[i].run;
// The examples are not always in the same order, so it should always be 2 or 3
expect(currRun.outputs?.foo).toBeGreaterThanOrEqual(2);
expect(currRun.outputs?.foo).toBeLessThanOrEqual(3);

const firstRunResults = evalRes.results[i].evaluationResults;
expect(firstRunResults.results).toHaveLength(0);
}
});

test("evaluate can evaluate with RunEvaluator evaluators", async () => {
const targetFunc = (input: { input: number }) => {
return { foo: input.input + 1 };
Expand Down
65 changes: 55 additions & 10 deletions python/langsmith/evaluation/_arunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
List,
Optional,
Sequence,
TypeVar,
Union,
cast,
)
Expand Down Expand Up @@ -61,6 +62,7 @@ async def aevaluate(
experiment_prefix: Optional[str] = None,
description: Optional[str] = None,
max_concurrency: Optional[int] = None,
num_repetitions: int = 1,
client: Optional[langsmith.Client] = None,
blocking: bool = True,
) -> AsyncExperimentResults:
Expand All @@ -81,6 +83,9 @@ async def aevaluate(
description (Optional[str]): A description of the experiment.
max_concurrency (Optional[int]): The maximum number of concurrent
evaluations to run. Defaults to None.
num_repetitions (int): The number of times to run the evaluation.
Each item in the dataset will be run and evaluated this many times.
Defaults to 1.
client (Optional[langsmith.Client]): The LangSmith client to use.
Defaults to None.
blocking (bool): Whether to block until the evaluation is complete.
Expand Down Expand Up @@ -225,6 +230,7 @@ async def aevaluate(
experiment_prefix=experiment_prefix,
description=description,
max_concurrency=max_concurrency,
num_repetitions=num_repetitions,
client=client,
blocking=blocking,
)
Expand Down Expand Up @@ -343,6 +349,7 @@ async def _aevaluate(
experiment_prefix: Optional[str] = None,
description: Optional[str] = None,
max_concurrency: Optional[int] = None,
num_repetitions: int = 1,
client: Optional[langsmith.Client] = None,
blocking: bool = True,
experiment: Optional[schemas.TracerSession] = None,
Expand All @@ -363,6 +370,7 @@ async def _aevaluate(
metadata=metadata,
experiment=experiment_ or experiment_prefix,
description=description,
num_repetitions=num_repetitions,
runs=runs,
).astart()
cache_dir = ls_utils.get_cache_dir(None)
Expand Down Expand Up @@ -423,6 +431,7 @@ def __init__(
evaluation_results: Optional[AsyncIterable[EvaluationResults]] = None,
summary_results: Optional[AsyncIterable[EvaluationResults]] = None,
description: Optional[str] = None,
num_repetitions: int = 1,
):
super().__init__(
experiment=experiment,
Expand All @@ -437,10 +446,16 @@ def __init__(
)
self._evaluation_results = evaluation_results
self._summary_results = summary_results
self._num_repetitions = num_repetitions

def aget_examples(self) -> AsyncIterator[schemas.Example]:
async def aget_examples(self) -> AsyncIterator[schemas.Example]:
if self._examples is None:
self._examples = _aresolve_data(self._data, client=self.client)
if self._num_repetitions > 1:
self._examples = async_chain_from_iterable(
aitertools.atee(self._examples, self._num_repetitions)
)

self._examples, examples_iter = aitertools.atee(
aitertools.ensure_async_iterator(self._examples), 2, lock=asyncio.Lock()
)
Expand All @@ -450,7 +465,7 @@ async def get_dataset_id(self) -> str:
if self._experiment is None or not getattr(
self._experiment, "reference_dataset_id", None
):
example = await aitertools.py_anext(self.aget_examples())
example = await aitertools.py_anext(await self.aget_examples())
if example is None:
raise ValueError("No examples found in the dataset.")
return str(example.dataset_id)
Expand All @@ -467,7 +482,7 @@ async def aget_runs(self) -> AsyncIterator[schemas.Run]:

async def aget_evaluation_results(self) -> AsyncIterator[EvaluationResults]:
if self._evaluation_results is None:
async for _ in self.aget_examples():
async for _ in await self.aget_examples():
yield {"results": []}
else:
self._evaluation_results, evaluation_results = aitertools.atee(
Expand All @@ -479,13 +494,14 @@ async def aget_evaluation_results(self) -> AsyncIterator[EvaluationResults]:
yield result

async def astart(self) -> _AsyncExperimentManager:
first_example = await aitertools.py_anext(self.aget_examples())
first_example = await aitertools.py_anext(await self.aget_examples())
if not first_example:
raise ValueError("No examples found in the dataset.")
project = self._get_project(first_example)
self._print_experiment_start(project, first_example)
self._metadata["num_repetitions"] = self._num_repetitions
return self.__class__(
self.aget_examples(),
await self.aget_examples(),
experiment=project,
metadata=self._metadata,
client=self.client,
Expand Down Expand Up @@ -535,7 +551,7 @@ async def awith_summary_evaluators(
wrapped_evaluators = _wrap_summary_evaluators(summary_evaluators)
aggregate_feedback_gen = self._aapply_summary_evaluators(wrapped_evaluators)
return _AsyncExperimentManager(
self.aget_examples(),
await self.aget_examples(),
experiment=self._experiment,
metadata=self._metadata,
client=self.client,
Expand All @@ -546,7 +562,7 @@ async def awith_summary_evaluators(

async def aget_results(self) -> AsyncIterator[ExperimentResultRow]:
async for run, example, evaluation_results in aitertools.async_zip(
self.aget_runs(), self.aget_examples(), self.aget_evaluation_results()
self.aget_runs(), await self.aget_examples(), self.aget_evaluation_results()
):
yield ExperimentResultRow(
run=run,
Expand All @@ -573,7 +589,7 @@ async def _apredict(
fn = _ensure_async_traceable(target)

async def predict_all():
async for example in self.aget_examples():
async for example in await self.aget_examples():
# Yield the coroutine to be awaited later
yield _aforward(
fn, example, self.experiment_name, self._metadata, self.client
Expand Down Expand Up @@ -645,7 +661,7 @@ async def _aapply_summary_evaluators(
self, summary_evaluators: Sequence[SUMMARY_EVALUATOR_T]
) -> AsyncIterator[EvaluationResults]:
runs, examples = [], []
async_examples = aitertools.ensure_async_iterator(self.aget_examples())
async_examples = aitertools.ensure_async_iterator(await self.aget_examples())
async for run, example in aitertools.async_zip(
self.aget_runs(), async_examples
):
Expand Down Expand Up @@ -694,7 +710,7 @@ async def _aapply_summary_evaluators(

async def _get_dataset_version(self) -> Optional[str]:
modified_at = []
async for example in self.aget_examples():
async for example in await self.aget_examples():
if example.modified_at:
# Should always be defined in practice when fetched,
# but the typing permits None
Expand All @@ -703,13 +719,30 @@ async def _get_dataset_version(self) -> Optional[str]:
max_modified_at = max(modified_at) if modified_at else None
return max_modified_at.isoformat() if max_modified_at else None

async def _get_dataset_splits(self) -> Optional[list[str]]:
splits = set()
async for example in await self.aget_examples():
if (
example.metadata
and example.metadata.get("dataset_split")
and isinstance(example.metadata["dataset_split"], list)
):
for split in example.metadata["dataset_split"]:
if isinstance(split, str):
splits.add(split)
else:
splits.add("base")

return list(splits)

async def _aend(self) -> None:
experiment = self._experiment
if experiment is None:
raise ValueError("Experiment not started yet.")

project_metadata = self._get_experiment_metadata()
project_metadata["dataset_version"] = await self._get_dataset_version()
project_metadata["dataset_splits"] = await self._get_dataset_splits()
self.client.update_project(
experiment.id,
end_time=datetime.datetime.now(datetime.timezone.utc),
Expand Down Expand Up @@ -823,3 +856,15 @@ def _aresolve_data(
if isinstance(data, AsyncIterable):
return aitertools.ensure_async_iterator(data)
return aitertools.ensure_async_iterator(_resolve_data(data, client=client))


T = TypeVar("T")


async def async_chain_from_iterable(
iterable: Iterable[AsyncIterable[T]],
) -> AsyncIterator[T]:
"""Chain multiple async iterables."""
for sub_iterable in iterable:
async for item in sub_iterable:
yield item

0 comments on commit 69d4dde

Please sign in to comment.