Skip to content

Commit

Permalink
The new feature of srs_bench: pushing RTP (with payload type PS) over…
Browse files Browse the repository at this point in the history
… TCP.
  • Loading branch information
duiniuluantanqin committed Mar 11, 2024
1 parent 6c651e7 commit ba42a40
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 64 deletions.
118 changes: 71 additions & 47 deletions trunk/3rdparty/srs-bench/gb28181/gb28181.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -24,12 +24,14 @@ import (
"context"
"flag"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"io"
"os"
"strconv"
"strings"
"time"

"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
)

type gbMainConfig struct {
Expand All @@ -50,40 +52,47 @@ func Parse(ctx context.Context) interface{} {
fl.StringVar(&c.sipConfig.domain, "domain", "", "")
fl.IntVar(&c.sipConfig.random, "random", 0, "")

fl.StringVar(&c.psConfig.addr, "mr", "", "")
fl.StringVar(&c.psConfig.ssrc, "ssrc", "", "")
fl.StringVar(&c.psConfig.video, "sv", "", "")
fl.StringVar(&c.psConfig.audio, "sa", "", "")
fl.IntVar(&c.psConfig.fps, "fps", 0, "")

fl.Usage = func() {
fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
fmt.Println(fmt.Sprintf("Options:"))
fmt.Println(fmt.Sprintf(" -sfu The target SFU, srs or gb28181 or janus. Default: srs"))
fmt.Println(fmt.Sprintf("SIP:"))
fmt.Println(fmt.Sprintf(" -user The SIP username, ID of device."))
fmt.Println(fmt.Sprintf(" -random Append N number to user as random device ID, like 1320000001."))
fmt.Println(fmt.Sprintf(" -server The SIP server ID, ID of server."))
fmt.Println(fmt.Sprintf(" -domain The SIP domain, domain of server and device."))
fmt.Println(fmt.Sprintf("Publisher:"))
fmt.Println(fmt.Sprintf(" -pr The SIP server address, format is tcp://ip:port over TCP."))
fmt.Println(fmt.Sprintf(" -fps [Optional] The fps of .h264 source file."))
fmt.Println(fmt.Sprintf(" -sa [Optional] The file path to read audio, ignore if empty."))
fmt.Println(fmt.Sprintf(" -sv [Optional] The file path to read video, ignore if empty."))
fmt.Println(fmt.Sprintf("\n例如,1个推流:"))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 34020000001320000001 -server 34020000002000000001 -domain 3402000000", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000 -sa avatar.aac -sv avatar.h264 -fps 25", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user livestream -server srs -domain ossrs.io -sa avatar.aac -sv avatar.h264 -fps 25", os.Args[0]))
fmt.Printf("Usage: %v [Options]\n", os.Args[0])
fmt.Printf("Options:\n")
fmt.Printf(" -sfu The target SFU, srs or gb28181 or janus. Default: srs\n")
fmt.Printf("SIP:\n")
fmt.Printf(" -user The SIP username, ID of device.\n")
fmt.Printf(" -random Append N number to user as random device ID, like 1320000001.\n")
fmt.Printf(" -server The SIP server ID, ID of server.\n")
fmt.Printf(" -domain The SIP domain, domain of server and device.\n")
fmt.Printf("Publisher:\n")
fmt.Printf(" -pr The SIP server address, format is tcp://ip:port over TCP.\n")
fmt.Printf(" -mr The Meida server address, format is tcp://ip:port over TCP.\n")
fmt.Printf(" -ssrc [Optional] The ssrc of rtp packet\n")
fmt.Printf(" -fps [Optional] The fps of .h264 source file.\n")
fmt.Printf(" -sa [Optional] The file path to read audio, ignore if empty.\n")
fmt.Printf(" -sv [Optional] The file path to read video, ignore if empty.\n")
fmt.Printf("\n例如,1个推流:\n")
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 34020000001320000001 -server 34020000002000000001 -domain 3402000000\n", os.Args[0])
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000\n", os.Args[0])
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000 -sa avatar.aac -sv avatar.h264 -fps 25\n", os.Args[0])
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user livestream -server srs -domain ossrs.io -sa avatar.aac -sv avatar.h264 -fps 25\n", os.Args[0])
fmt.Printf("\n例如,仅作为媒体服务器,需要提前调用API(/gb/v1/publish)创建通道:\n")
fmt.Printf(" %v -sfu gb28181 -mr tcp://127.0.0.1:9000 -ssrc 1234567890 -sa avatar.aac -sv avatar.h264 -fps 25\n", os.Args[0])

fmt.Println()
}
if err := fl.Parse(os.Args[1:]); err == flag.ErrHelp {
os.Exit(0)
}

showHelp := c.sipConfig.String() == ""
if showHelp {
fl.Usage()
os.Exit(-1)
}
// showHelp := c.sipConfig.String() == ""
// if showHelp {
// fl.Usage()
// os.Exit(-1)
// }

summaryDesc := ""
if c.sipConfig.addr != "" {
Expand All @@ -98,41 +107,56 @@ func Parse(ctx context.Context) interface{} {
func Run(ctx context.Context, r0 interface{}) (err error) {
conf := r0.(*gbMainConfig)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var mediaAddr string
var sessionOut GBSessionOutput
if conf.sipConfig.addr != "" {
session := NewGBSession(&GBSessionConfig{
regTimeout: 3 * time.Hour, inviteTimeout: 3 * time.Hour,
}, &conf.sipConfig)
defer session.Close()

if err := session.Connect(ctx); err != nil {
return errors.Wrapf(err, "connect %v", conf.sipConfig)
}

session := NewGBSession(&GBSessionConfig{
regTimeout: 3 * time.Hour, inviteTimeout: 3 * time.Hour,
}, &conf.sipConfig)
defer session.Close()
if err := session.Register(ctx); err != nil {
return errors.Wrapf(err, "register %v", conf.sipConfig)
}

if err := session.Connect(ctx); err != nil {
return errors.Wrapf(err, "connect %v", conf.sipConfig)
}
if err := session.Invite(ctx); err != nil {
return errors.Wrapf(err, "invite %v", conf.sipConfig)
}

if err := session.Register(ctx); err != nil {
return errors.Wrapf(err, "register %v", conf.sipConfig)
}
if mediaAddr, err = utilBuildMediaAddr(session.sip.conf.addr, session.out.mediaPort); err != nil {
return errors.Wrapf(err, "build media addr, sip=%v, mediaPort=%v", session.sip.conf.addr, session.out.mediaPort)
}

if err := session.Invite(ctx); err != nil {
return errors.Wrapf(err, "invite %v", conf.sipConfig)
sessionOut = *session.out
} else if conf.psConfig.addr != "" {
sessionOut.ssrc, err = strconv.ParseInt(conf.psConfig.ssrc, 10, 64)
if err != nil {
return errors.Wrapf(err, "parse ssrc=%v", conf.psConfig.ssrc)
}
sessionOut.clockRate = 90000
sessionOut.payloadType = 96
mediaAddr = conf.psConfig.addr
}

if conf.psConfig.video == "" || conf.psConfig.audio == "" {
cancel()
return nil
return errors.Errorf("video or audio is empty, video=%v, audio=%v", conf.psConfig.video, conf.psConfig.audio)
}

ingester := NewPSIngester(&IngesterConfig{
psConfig: conf.psConfig,
ssrc: uint32(session.out.ssrc),
clockRate: session.out.clockRate,
payloadType: uint8(session.out.payloadType),
ssrc: uint32(sessionOut.ssrc),
clockRate: sessionOut.clockRate,
payloadType: sessionOut.payloadType,
})
defer ingester.Close()

if ingester.conf.serverAddr, err = utilBuildMediaAddr(session.sip.conf.addr, session.out.mediaPort); err != nil {
return err
}

ingester.conf.serverAddr = mediaAddr
if err := ingester.Ingest(ctx); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, video=%v, audio=%v", conf.psConfig.video, conf.psConfig.audio)
Expand Down
15 changes: 10 additions & 5 deletions trunk/3rdparty/srs-bench/gb28181/ps.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -23,17 +23,22 @@ package gb28181
import (
"context"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/pion/rtp"
"github.com/yapingcat/gomedia/codec"
"github.com/yapingcat/gomedia/mpeg2"
"math"
"net"
"net/url"
"strings"

"github.com/ossrs/go-oryx-lib/errors"
"github.com/pion/rtp"
"github.com/yapingcat/gomedia/codec"
"github.com/yapingcat/gomedia/mpeg2"
)

type PSConfig struct {
// The media server address, for example: tcp://127.0.0.1:9000
addr string
// The SSRC for rtp.
ssrc string
// The video source file.
video string
// The fps for h264 file.
Expand Down
15 changes: 8 additions & 7 deletions trunk/3rdparty/srs-bench/gb28181/sip.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -23,20 +23,21 @@ package gb28181
import (
"context"
"fmt"
"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/transport"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"math/rand"
"net/url"
"strings"
"sync"
"time"

"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/transport"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
)

type SIPConfig struct {
// The server address, for example: tcp://127.0.0.1:5060k
// The server address, for example: tcp://127.0.0.1:5060
addr string
// The SIP domain, for example: ossrs.io or 3402000000
domain string
Expand Down
65 changes: 60 additions & 5 deletions trunk/3rdparty/srs-bench/gb28181/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -23,19 +23,24 @@ package gb28181
import (
"bufio"
"context"
"encoding/json"
"flag"
"fmt"
"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/aac"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/yapingcat/gomedia/mpeg2"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"path"
"strings"
"time"

"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/aac"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"github.com/yapingcat/gomedia/mpeg2"
)

var srsLog *bool
Expand Down Expand Up @@ -359,3 +364,53 @@ func (v *AACReader) NextADTSFrame() ([]byte, error) {

return adts, nil
}

// Request SRS GB API, the apiPath like "/gb/v1/publish",
//
// Return the response of answer in string.
func apiGbRequest(ctx context.Context, apiPath, host, stream, ssrc string) (string, error) {
api := host
if !strings.HasPrefix(apiPath, "/") {
api += "/"
}
api += apiPath

if !strings.HasSuffix(apiPath, "/") {
api += "/"
}

// Build JSON body.
reqBody := struct {
ClientIP string `json:"clientip"`
Stream string `json:"stream"`
SSRC string `json:"ssrc"`
}{
host, stream, ssrc,
}

b, err := json.Marshal(reqBody)
if err != nil {
return "", errors.Wrapf(err, "Marshal body %v", reqBody)
}
logger.If(ctx, "Request url api=%v with %v", api, string(b))
logger.Tf(ctx, "Request url api=%v with %v bytes", api, len(b))

req, err := http.NewRequest("POST", api, strings.NewReader(string(b)))
if err != nil {
return "", errors.Wrapf(err, "HTTP request %v", string(b))
}

res, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return "", errors.Wrapf(err, "Do HTTP request %v", string(b))
}

b2, err := ioutil.ReadAll(res.Body)
if err != nil {
return "", errors.Wrapf(err, "Read response for %v", string(b))
}
logger.If(ctx, "Response from %v is %v", api, string(b2))
logger.Tf(ctx, "Response from %v is %v bytes", api, len(b2))

return string(b2), nil
}

0 comments on commit ba42a40

Please sign in to comment.