Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(sdk): add prototype python wrapper for nexus #7373

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/cmd/wandb-core/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/getsentry/sentry-go"
"github.com/wandb/wandb/core/pkg/observability"
"github.com/wandb/wandb/core/pkg/server"
"github.com/wandb/wandb/core/internal/processlib"
)

// this is set by the build script and used by the observability package
Expand All @@ -33,6 +34,11 @@ func main() {

flag.Parse()

if *pid != 0 {
// Lets make sure this process is killed by the OS (if supported)
processlib.ShutdownOnParentDeath(*pid)
}

// set up sentry reporting
observability.InitSentry(*disableAnalytics, commit)
defer sentry.Flush(2)
Expand Down Expand Up @@ -84,7 +90,7 @@ func main() {
}
defer trace.Stop()
}
serve, err := server.NewServer(ctx, "127.0.0.1:0", *portFilename)
serve, err := server.NewServer(ctx, "127.0.0.1:0", *portFilename, *pid)
if err != nil {
slog.Error("failed to start server, exiting", "error", err)
return
Expand Down
9 changes: 5 additions & 4 deletions core/internal/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func readLines(path string) ([]string, error) {

type Launcher struct {
portFilename string
pidParent int
}

func (l *Launcher) tryport() (int, error) {
Expand Down Expand Up @@ -80,7 +81,7 @@ func (l *Launcher) prepTempfile() {

func (l *Launcher) LaunchCommand(command string) (*execbin.ForkExecCmd, error) {
l.prepTempfile()
args := []string{"--port-filename", l.portFilename}
args := []string{"--port-filename", l.portFilename, "--pid", string(l.pidParent)}
cmd, err := execbin.ForkExecCommand(command, args)
if err != nil {
panic(err)
Expand All @@ -91,14 +92,14 @@ func (l *Launcher) LaunchCommand(command string) (*execbin.ForkExecCmd, error) {
func (l *Launcher) LaunchBinary(filePayload []byte) (*execbin.ForkExecCmd, error) {
l.prepTempfile()

args := []string{"--port-filename", l.portFilename}
args := []string{"--port-filename", l.portFilename, "--pid", strconv.Itoa(l.pidParent)}
cmd, err := execbin.ForkExec(filePayload, args)
if err != nil {
panic(err)
}
return cmd, err
}

func NewLauncher() *Launcher {
return &Launcher{}
func NewLauncher(pidParent int) *Launcher {
return &Launcher{pidParent: pidParent}
}
22 changes: 22 additions & 0 deletions core/internal/processlib/processlib_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package processlib

import (
"os"
"syscall"
)

const (
PRCTL_SYSCALL = 157
PR_SET_PDEATHSIG = 1
)

func ShutdownOnParentDeath(parentPid int) {
_, _, errno := syscall.Syscall(uintptr(PRCTL_SYSCALL), uintptr(PR_SET_PDEATHSIG), uintptr(syscall.SIGKILL), 0)
if errno != 0 {
os.Exit(127 + int(errno))
}
// One last check... there is a possibility that the parent died right before the syscall was sent
if os.Getppid() != parentPid {
os.Exit(1)
}
}
6 changes: 6 additions & 0 deletions core/internal/processlib/processlib_notlinux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
//go:build !linux

package processlib

func ShutdownOnParentDeath(parentPid int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

poor process :(

}
65 changes: 65 additions & 0 deletions core/lang/core/interface_pb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package main

import (
"C"
"unsafe"
"google.golang.org/protobuf/proto"
"github.com/wandb/wandb/core/pkg/service"
"github.com/wandb/wandb/core/pkg/gowandb/opts/runopts"
)

//export pbSessionSetup
func pbSessionSetup() {
wandbcoreSetup()
}

//export pbSessionTeardown
func pbSessionTeardown() {
// prob dont want this, we could share nexus across "sessions"
wandbcoreTeardown()
}

//export pbRunStart
func pbRunStart() int {
options := []runopts.RunOption{}
wandbcoreSetup()
run, err := wandbSession.NewRun(options...)
if err != nil {
panic(err)
}
num := wandbRuns.Add(run)
return num
}

//export pbRunLog
func pbRunLog(num int, cBuffer *C.char, cLength C.int) {
data := C.GoBytes(unsafe.Pointer(cBuffer), cLength)
// Unmarshal protobuf
msg := &service.DataRecord{}
if err := proto.Unmarshal(data, msg); err != nil {
return
}
// Process data (here simply prepending a string)
run := wandbRuns.Get(num)

dict := make(map[string]interface{})
for k, v := range msg.Item {
switch value := v.DataType.(type) {
case *service.DataValue_ValueInt:
dict[k] = value.ValueInt
case *service.DataValue_ValueDouble:
dict[k] = value.ValueDouble
case *service.DataValue_ValueString:
dict[k] = value.ValueString
}
}

run.Log(dict)
}

//export pbRunFinish
func pbRunFinish(num int) {
run := wandbRuns.Get(num)
run.Finish()
wandbRuns.Remove(num)
}
2 changes: 2 additions & 0 deletions core/lang/core/runkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ func NewRunKeeper() *RunKeeper {
}

func (k *RunKeeper) Get(num int) *gowandb.Run {
k.mutex.Lock()
defer k.mutex.Unlock()
return k.runs[num]
}

Expand Down
3 changes: 3 additions & 0 deletions core/lang/core/wandbcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ typedef enum {
import "C"

import (
"os"
"unsafe"

"github.com/wandb/wandb/core/internal/gowandb/internal_runopts"
Expand All @@ -31,8 +32,10 @@ func wandbcoreSetup() {
return
}
var err error
currentPid := os.Getpid()
wandbSession, err = gowandb.NewSession(
sessionopts.WithCoreBinary(coreBinary),
sessionopts.WithPidParent(currentPid),
)
if err != nil {
panic(err)
Expand Down
1 change: 1 addition & 0 deletions core/lang/py/examples/local.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
time WANDB_BASE_URL=http://localhost:8080 $*
8 changes: 8 additions & 0 deletions core/lang/py/examples/multiple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/usr/bin/env python
import wandb

runs = [wandb.init() for x in range(10)]
for run in runs:
run.log({"a": 1, "b": 2, "c": 4.0, "d": "blah"})
for run in runs:
run.finish()
1 change: 1 addition & 0 deletions core/lang/py/examples/offline.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
time WANDB_MODE=offline $*
12 changes: 12 additions & 0 deletions core/lang/py/examples/setup-nexus-py.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash
set -e
rm -rf nexus-py-1
virtualenv nexus-py-1
source nexus-py-1/bin/activate
pip install --upgrade pip
cd ../lib
./build_proto.sh
./build_lib.sh
pip install -e .
echo "Run:"
echo "source nexus-py-1/bin/activate"
40 changes: 40 additions & 0 deletions core/lang/py/examples/simple-newapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/usr/bin/env python
import wandb

"""
Top level:
new_api() -> API (? or sdk or library or core..or not)
default_api
default_session
default_run
(promote methods from default_api, default_session - and maybe default_run to top level namespace?)

API:
new_session -> Session

Session:
login()
configure_auth()
new_run()
get_run() # might have mutable and readonly versions of the run? readonly by default?
# alternate, prefix with object type? run_new, run_get... dont love
# ? are api runs just like runapi --> can we log to a run from the public api? why not?

Run:
log()
history() -> how does this work for a run in progress


"""

api = wandb.new_api()
session = api.new_session()

run = session.new_run()
run_id = run.id
run.log({"a": 1})
run.finish()

run = session.get_run(run_id)
for row in run.history():
pass
7 changes: 7 additions & 0 deletions core/lang/py/examples/simple-newsdk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env python
import wandb

wb = wandb.new_session()
run = wb.new_run()
run.log({"a": 1, "b": 2})
run.finish()
6 changes: 6 additions & 0 deletions core/lang/py/examples/simple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python
import wandb

run = wandb.init()
run.log({"a": 1, "b": 2, "c": 4.0, "d": "blah"})
run.finish()
27 changes: 27 additions & 0 deletions core/lang/py/examples/threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/usr/bin/env python

import argparse
from concurrent.futures import ThreadPoolExecutor
import time
import wandb

wandb.setup()

def do_run():
run = wandb.init()
run.log({"a": 1, "b": 2, "c": 4.0, "d": "blah"})
run.finish()


parser = argparse.ArgumentParser(description="benchmark wandb performance")
parser.add_argument("--num-workers", "-n", type=int, default=20)
args = parser.parse_args()

jobs = []
with ThreadPoolExecutor(max_workers=args.num_workers) as executor:
for _ in range(args.num_workers):
jobs.append(executor.submit(do_run))

for job in jobs:
_ = job.result()
print("done?")
5 changes: 5 additions & 0 deletions core/lang/py/lib/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
recursive-include wandb *.py
recursive-include wandb *.sh
recursive-include wandb *.so
recursive-exclude * __pycache__
recursive-exclude * *.py[co]
9 changes: 9 additions & 0 deletions core/lang/py/lib/build_lib.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

set -e
BASE=../../
DEST=py/lib/
cd $BASE/
./scripts/base-build.sh
mkdir -p $DEST/wandb/lib
cp export/lib/libwandb_core.so $DEST/wandb/lib
12 changes: 12 additions & 0 deletions core/lang/py/lib/build_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

set -e
BASE=../../../..
DEST=core/lang/py/lib/
# cp $BASE/wandb/proto/*.proto wandb/proto/
cd $BASE/
protoc -I=. --python_out=$DEST wandb/proto/wandb_base.proto
protoc -I=. --python_out=$DEST wandb/proto/wandb_internal.proto
protoc -I=. --python_out=$DEST wandb/proto/wandb_telemetry.proto
protoc -I=. --python_out=$DEST wandb/proto/wandb_settings.proto
protoc -I=. --python_out=$DEST wandb/proto/wandb_server.proto
53 changes: 53 additions & 0 deletions core/lang/py/lib/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[build-system]
requires = ['setuptools>61']
build-backend = "setuptools.build_meta"

[project]
name = "wandb"
dynamic = ["version"]
description = "A CLI and library for interacting with the Weights & Biases API."
authors = [{ name = "Weights & Biases", email = "[email protected]" }]
# readme = "package_readme.md"
# license = { file = "LICENSE" }
requires-python = ">=3.7"
dependencies = [
"protobuf>=3.12.0,!=4.21.0,<5; python_version < '3.9' and sys_platform == 'linux'",
"protobuf>=3.15.0,!=4.21.0,<5; python_version == '3.9' and sys_platform == 'linux'",
"protobuf>=3.19.0,!=4.21.0,<5; python_version > '3.9' and sys_platform == 'linux'",
"protobuf>=3.19.0,!=4.21.0,<5; sys_platform != 'linux'",
"setuptools",
]
classifiers = [
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: MIT License",
"Natural Language :: English",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3 :: Only",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: System :: Logging",
"Topic :: System :: Monitoring",
]


#[project.scripts]
#wandb = "wandb.cli.cli:cli"
#wb = "wandb.cli.cli:cli"

[project.urls]
"Source" = "https://github.com/wandb/wandb"
"Bug Reports" = "https://github.com/wandb/wandb/issues"
"Documentation" = "https://docs.wandb.ai/"

[tool.setuptools]
zip-safe = false
packages = ["wandb"]
package-dir = { "wandb" = "wandb" }
3 changes: 3 additions & 0 deletions core/lang/py/lib/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from setuptools import setup

setup()
2 changes: 2 additions & 0 deletions core/lang/py/lib/wandb/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
proto/
lib/