Skip to content

Commit

Permalink
add gb_test func:TestGbPublishWithApi
Browse files Browse the repository at this point in the history
  • Loading branch information
duiniuluantanqin committed Mar 27, 2024
1 parent ba42a40 commit 9f665e2
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 46 deletions.
82 changes: 79 additions & 3 deletions trunk/3rdparty/srs-bench/gb28181/gb_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -23,11 +23,18 @@ package gb28181
import (
"context"
"fmt"
"io"
"math/rand"
"strconv"
"sync"
"testing"
"time"

"github.com/ossrs/srs-bench/srs"

"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"testing"
"time"
)

func TestGbPublishRegularly(t *testing.T) {
Expand Down Expand Up @@ -523,3 +530,72 @@ func TestGbPublishUnregister(t *testing.T) {
t.Errorf("err %+v", err)
}
}

func TestGbPublishWithApi(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
defer cancel()

psConfig := PSConfig{
video: *srsPublishVideo,
fps: *srsPublishVideoFps,
audio: *srsPublishAudio,
}

var r0, r1, r2 error
r0 = func() error {
// generate a random ssrc, and use it as the stream name.
ssrc := rand.Intn(900000000) + 1000000000
stream := strconv.FormatUint(uint64(ssrc), 10)

port, err := apiGbPublishRequest(ctx, stream, strconv.Itoa(ssrc))
if err != nil {
return errors.Wrapf(err, "apiGbPublishRequest stream=%v", stream)
}

ingester := NewPSIngester(&IngesterConfig{
psConfig: psConfig,
ssrc: uint32(ssrc),
clockRate: 90000,
payloadType: 96,
})
defer ingester.Close()

var wg sync.WaitGroup
defer wg.Wait()

// Check the stream is publishing.
wg.Add(1)
go func() {
defer wg.Done()
for ctx.Err() == nil {
stat := srs.NewStatApi(ctx).Streams().FilterByStreamSuffix(stream)
logger.Tf(ctx, "Check publishing, stream=%v", stat.Stream())
if stat.Stream() != nil {
return
}
time.Sleep(1 * time.Second)
}
r1 = ctx.Err()
}()

// Ingest the stream.
wg.Add(1)
go func() {
defer wg.Done()
ingester.conf.serverAddr = "tcp://localhost:" + strconv.Itoa(port)
if err := ingester.Ingest(ctx); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, video=%v, audio=%v", psConfig.video, psConfig.audio)
return
}
r2 = err
}
}()
return err
}()

if err := filterTestError(ctx.Err(), r0, r1, r2); err != nil {
t.Errorf("err %+v", err)
}
}
54 changes: 11 additions & 43 deletions trunk/3rdparty/srs-bench/gb28181/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@ package gb28181
import (
"bufio"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path"
Expand All @@ -39,7 +36,7 @@ import (
"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/aac"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/ossrs/srs-bench/srs"
"github.com/yapingcat/gomedia/mpeg2"
)

Expand Down Expand Up @@ -365,52 +362,23 @@ func (v *AACReader) NextADTSFrame() ([]byte, error) {
return adts, nil
}

// Request SRS GB API, the apiPath like "/gb/v1/publish",
//
// Return the response of answer in string.
func apiGbRequest(ctx context.Context, apiPath, host, stream, ssrc string) (string, error) {
api := host
if !strings.HasPrefix(apiPath, "/") {
api += "/"
}
api += apiPath

if !strings.HasSuffix(apiPath, "/") {
api += "/"
}

// Build JSON body.
reqBody := struct {
func apiGbPublishRequest(ctx context.Context, stream, ssrc string) (int, error) {
req := struct {
ClientIP string `json:"clientip"`
Stream string `json:"stream"`
SSRC string `json:"ssrc"`
}{
host, stream, ssrc,
}

b, err := json.Marshal(reqBody)
if err != nil {
return "", errors.Wrapf(err, "Marshal body %v", reqBody)
"", stream, ssrc,
}
logger.If(ctx, "Request url api=%v with %v", api, string(b))
logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b))

req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
if err != nil {
return "", errors.Wrapf(err, "HTTP request %v", string(b))
}
res := struct {
Code int `json:"code"`
Port int `json:"port"`
}{}

res, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
}

b2, err := ioutil.ReadAll(res.Body)
if err != nil {
return "", errors.Wrapf(err, "Read response for %v", string(b))
if err := srs.ApiRequest(ctx, "http://localhost:1985/gb/v1/publish/", req, &res); err != nil {
return 0, errors.Wrapf(err, "gb/v1/publish")
}
logger.If(ctx, "Response from %v is %v", api, string(b2))
logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2))

return string(b2), nil
return res.Port, nil
}
12 changes: 12 additions & 0 deletions trunk/3rdparty/srs-bench/srs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func apiRequest(ctx context.Context, r string, req interface{}, res interface{})
return nil
}

func ApiRequest(ctx context.Context, r string, req interface{}, res interface{}) error {
return apiRequest(ctx, r, req, res)
}

// The SRS HTTP statistic API.
type statAPI struct {
ctx context.Context
Expand All @@ -98,6 +102,10 @@ func newStatAPI(ctx context.Context) *statAPI {
return &statAPI{ctx: ctx}
}

func NewStatApi(ctx context.Context) *statAPI {
return newStatAPI(ctx)
}

type statGeneral struct {
Code int `json:"code"`
Server string `json:"server"`
Expand Down Expand Up @@ -152,3 +160,7 @@ func (v *statAPI) FilterByStreamSuffix(suffix string) *statAPI {
}
return v
}

func (v *statAPI) Stream() *statStream {
return v.stream
}

0 comments on commit 9f665e2

Please sign in to comment.