Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GB: Support enabling media forwarding through API instead of SIP. #3384

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
110 changes: 67 additions & 43 deletions trunk/3rdparty/srs-bench/gb28181/gb28181.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -24,12 +24,14 @@ import (
"context"
"flag"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"io"
"os"
"strconv"
"strings"
"time"

"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
)

type gbMainConfig struct {
Expand All @@ -50,36 +52,43 @@ func Parse(ctx context.Context) interface{} {
fl.StringVar(&c.sipConfig.domain, "domain", "", "")
fl.IntVar(&c.sipConfig.random, "random", 0, "")

fl.StringVar(&c.psConfig.addr, "mr", "", "")
fl.StringVar(&c.psConfig.ssrc, "ssrc", "", "")
fl.StringVar(&c.psConfig.video, "sv", "", "")
fl.StringVar(&c.psConfig.audio, "sa", "", "")
fl.IntVar(&c.psConfig.fps, "fps", 0, "")

fl.Usage = func() {
fmt.Println(fmt.Sprintf("Usage: %v [Options]", os.Args[0]))
fmt.Println(fmt.Sprintf("Options:"))
fmt.Println(fmt.Sprintf(" -sfu The target SFU, srs or gb28181 or janus. Default: srs"))
fmt.Println(fmt.Sprintf("SIP:"))
fmt.Println(fmt.Sprintf(" -user The SIP username, ID of device."))
fmt.Println(fmt.Sprintf(" -random Append N number to user as random device ID, like 1320000001."))
fmt.Println(fmt.Sprintf(" -server The SIP server ID, ID of server."))
fmt.Println(fmt.Sprintf(" -domain The SIP domain, domain of server and device."))
fmt.Println(fmt.Sprintf("Publisher:"))
fmt.Println(fmt.Sprintf(" -pr The SIP server address, format is tcp://ip:port over TCP."))
fmt.Println(fmt.Sprintf(" -fps [Optional] The fps of .h264 source file."))
fmt.Println(fmt.Sprintf(" -sa [Optional] The file path to read audio, ignore if empty."))
fmt.Println(fmt.Sprintf(" -sv [Optional] The file path to read video, ignore if empty."))
fmt.Println(fmt.Sprintf("\n例如,1个推流:"))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 34020000001320000001 -server 34020000002000000001 -domain 3402000000", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000 -sa avatar.aac -sv avatar.h264 -fps 25", os.Args[0]))
fmt.Println(fmt.Sprintf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user livestream -server srs -domain ossrs.io -sa avatar.aac -sv avatar.h264 -fps 25", os.Args[0]))
fmt.Printf("Usage: %v [Options]\n", os.Args[0])
fmt.Printf("Options:\n")
fmt.Printf(" -sfu The target SFU, srs or gb28181 or janus. Default: srs\n")
fmt.Printf("SIP:\n")
fmt.Printf(" -user The SIP username, ID of device.\n")
fmt.Printf(" -random Append N number to user as random device ID, like 1320000001.\n")
fmt.Printf(" -server The SIP server ID, ID of server.\n")
fmt.Printf(" -domain The SIP domain, domain of server and device.\n")
fmt.Printf("Publisher:\n")
fmt.Printf(" -pr The SIP server address, format is tcp://ip:port over TCP.\n")
fmt.Printf(" -mr The Meida server address, format is tcp://ip:port over TCP.\n")
fmt.Printf(" -ssrc [Optional] The ssrc of rtp packet\n")
fmt.Printf(" -fps [Optional] The fps of .h264 source file.\n")
fmt.Printf(" -sa [Optional] The file path to read audio, ignore if empty.\n")
fmt.Printf(" -sv [Optional] The file path to read video, ignore if empty.\n")
fmt.Printf("\n例如,1个推流:\n")
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 34020000001320000001 -server 34020000002000000001 -domain 3402000000\n", os.Args[0])
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000\n", os.Args[0])
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user 3402000000 -random 10 -server 34020000002000000001 -domain 3402000000 -sa avatar.aac -sv avatar.h264 -fps 25\n", os.Args[0])
fmt.Printf(" %v -sfu gb28181 -pr tcp://127.0.0.1:5060 -user livestream -server srs -domain ossrs.io -sa avatar.aac -sv avatar.h264 -fps 25\n", os.Args[0])
fmt.Printf("\n例如,仅作为媒体服务器,需要提前调用API(/gb/v1/publish)创建通道:\n")
fmt.Printf(" %v -sfu gb28181 -mr tcp://127.0.0.1:9000 -ssrc 1234567890 -sa avatar.aac -sv avatar.h264 -fps 25\n", os.Args[0])

fmt.Println()
}
if err := fl.Parse(os.Args[1:]); err == flag.ErrHelp {
os.Exit(0)
}

showHelp := c.sipConfig.String() == ""
showHelp := c.sipConfig.String() == "" && c.psConfig.addr == ""
if showHelp {
fl.Usage()
os.Exit(-1)
Expand All @@ -98,41 +107,56 @@ func Parse(ctx context.Context) interface{} {
func Run(ctx context.Context, r0 interface{}) (err error) {
conf := r0.(*gbMainConfig)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

var mediaAddr string
var sessionOut GBSessionOutput
if conf.sipConfig.addr != "" {
session := NewGBSession(&GBSessionConfig{
regTimeout: 3 * time.Hour, inviteTimeout: 3 * time.Hour,
}, &conf.sipConfig)
defer session.Close()

if err := session.Connect(ctx); err != nil {
return errors.Wrapf(err, "connect %v", conf.sipConfig)
}

session := NewGBSession(&GBSessionConfig{
regTimeout: 3 * time.Hour, inviteTimeout: 3 * time.Hour,
}, &conf.sipConfig)
defer session.Close()
if err := session.Register(ctx); err != nil {
return errors.Wrapf(err, "register %v", conf.sipConfig)
}

if err := session.Connect(ctx); err != nil {
return errors.Wrapf(err, "connect %v", conf.sipConfig)
}
if err := session.Invite(ctx); err != nil {
return errors.Wrapf(err, "invite %v", conf.sipConfig)
}

if err := session.Register(ctx); err != nil {
return errors.Wrapf(err, "register %v", conf.sipConfig)
}
if mediaAddr, err = utilBuildMediaAddr(session.sip.conf.addr, session.out.mediaPort); err != nil {
return errors.Wrapf(err, "build media addr, sip=%v, mediaPort=%v", session.sip.conf.addr, session.out.mediaPort)
}

if err := session.Invite(ctx); err != nil {
return errors.Wrapf(err, "invite %v", conf.sipConfig)
sessionOut = *session.out
} else if conf.psConfig.addr != "" {
sessionOut.ssrc, err = strconv.ParseInt(conf.psConfig.ssrc, 10, 64)
if err != nil {
return errors.Wrapf(err, "parse ssrc=%v", conf.psConfig.ssrc)
}
sessionOut.clockRate = 90000
sessionOut.payloadType = 96
mediaAddr = conf.psConfig.addr
}

if conf.psConfig.video == "" || conf.psConfig.audio == "" {
cancel()
return nil
return errors.Errorf("video or audio is empty, video=%v, audio=%v", conf.psConfig.video, conf.psConfig.audio)
}

ingester := NewPSIngester(&IngesterConfig{
psConfig: conf.psConfig,
ssrc: uint32(session.out.ssrc),
clockRate: session.out.clockRate,
payloadType: uint8(session.out.payloadType),
ssrc: uint32(sessionOut.ssrc),
clockRate: sessionOut.clockRate,
payloadType: sessionOut.payloadType,
})
defer ingester.Close()

if ingester.conf.serverAddr, err = utilBuildMediaAddr(session.sip.conf.addr, session.out.mediaPort); err != nil {
return err
}

ingester.conf.serverAddr = mediaAddr
if err := ingester.Ingest(ctx); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, video=%v, audio=%v", conf.psConfig.video, conf.psConfig.audio)
Expand Down
82 changes: 79 additions & 3 deletions trunk/3rdparty/srs-bench/gb28181/gb_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -23,11 +23,18 @@ package gb28181
import (
"context"
"fmt"
"io"
"math/rand"
"strconv"
"sync"
"testing"
"time"

"github.com/ossrs/srs-bench/srs"

"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"testing"
"time"
)

func TestGbPublishRegularly(t *testing.T) {
Expand Down Expand Up @@ -523,3 +530,72 @@ func TestGbPublishUnregister(t *testing.T) {
t.Errorf("err %+v", err)
}
}

func TestGbPublishWithApi(t *testing.T) {
ctx := logger.WithContext(context.Background())
ctx, cancel := context.WithTimeout(ctx, time.Duration(*srsTimeout)*time.Millisecond)
defer cancel()

psConfig := PSConfig{
video: *srsPublishVideo,
fps: *srsPublishVideoFps,
audio: *srsPublishAudio,
}

var r0, r1, r2 error
r0 = func() error {
// generate a random ssrc, and use it as the stream name.
ssrc := rand.Intn(900000000) + 1000000000
stream := strconv.FormatUint(uint64(ssrc), 10)

port, err := apiGbPublishRequest(ctx, stream, strconv.Itoa(ssrc))
if err != nil {
return errors.Wrapf(err, "apiGbPublishRequest stream=%v", stream)
}

ingester := NewPSIngester(&IngesterConfig{
psConfig: psConfig,
ssrc: uint32(ssrc),
clockRate: 90000,
payloadType: 96,
})
defer ingester.Close()

var wg sync.WaitGroup
defer wg.Wait()

// Check the stream is publishing.
wg.Add(1)
go func() {
defer wg.Done()
for ctx.Err() == nil {
stat := srs.NewStatApi(ctx).Streams().FilterByStreamSuffix(stream)
logger.Tf(ctx, "Check publishing, stream=%v", stat.Stream())
if stat.Stream() != nil {
return
}
time.Sleep(1 * time.Second)
}
r1 = ctx.Err()
}()

// Ingest the stream.
wg.Add(1)
go func() {
defer wg.Done()
ingester.conf.serverAddr = "tcp://localhost:" + strconv.Itoa(port)
if err := ingester.Ingest(ctx); err != nil {
if errors.Cause(err) == io.EOF {
logger.Tf(ctx, "EOF, video=%v, audio=%v", psConfig.video, psConfig.audio)
return
}
r2 = err
}
}()
return err
}()

if err := filterTestError(ctx.Err(), r0, r1, r2); err != nil {
t.Errorf("err %+v", err)
}
}
15 changes: 10 additions & 5 deletions trunk/3rdparty/srs-bench/gb28181/ps.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -23,17 +23,22 @@ package gb28181
import (
"context"
"fmt"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/pion/rtp"
"github.com/yapingcat/gomedia/codec"
"github.com/yapingcat/gomedia/mpeg2"
"math"
"net"
"net/url"
"strings"

"github.com/ossrs/go-oryx-lib/errors"
"github.com/pion/rtp"
"github.com/yapingcat/gomedia/codec"
"github.com/yapingcat/gomedia/mpeg2"
)

type PSConfig struct {
// The media server address, for example: tcp://127.0.0.1:9000
addr string
// The SSRC for rtp.
ssrc string
// The video source file.
video string
// The fps for h264 file.
Expand Down
15 changes: 8 additions & 7 deletions trunk/3rdparty/srs-bench/gb28181/sip.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -23,20 +23,21 @@ package gb28181
import (
"context"
"fmt"
"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/transport"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
"math/rand"
"net/url"
"strings"
"sync"
"time"

"github.com/ghettovoice/gosip/log"
"github.com/ghettovoice/gosip/sip"
"github.com/ghettovoice/gosip/transport"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/go-oryx-lib/logger"
)

type SIPConfig struct {
// The server address, for example: tcp://127.0.0.1:5060k
// The server address, for example: tcp://127.0.0.1:5060
addr string
// The SIP domain, for example: ossrs.io or 3402000000
domain string
Expand Down
33 changes: 28 additions & 5 deletions trunk/3rdparty/srs-bench/gb28181/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// The MIT License (MIT)
//
// Copyright (c) 2022 Winlin
// # Copyright (c) 2022 Winlin
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
Expand All @@ -25,17 +25,19 @@ import (
"context"
"flag"
"fmt"
"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/aac"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/yapingcat/gomedia/mpeg2"
"io"
"net"
"net/url"
"os"
"path"
"strings"
"time"

"github.com/ghettovoice/gosip/sip"
"github.com/ossrs/go-oryx-lib/aac"
"github.com/ossrs/go-oryx-lib/errors"
"github.com/ossrs/srs-bench/srs"
"github.com/yapingcat/gomedia/mpeg2"
)

var srsLog *bool
Expand Down Expand Up @@ -359,3 +361,24 @@ func (v *AACReader) NextADTSFrame() ([]byte, error) {

return adts, nil
}

func apiGbPublishRequest(ctx context.Context, stream, ssrc string) (int, error) {
req := struct {
ClientIP string `json:"clientip"`
Stream string `json:"stream"`
SSRC string `json:"ssrc"`
}{
"", stream, ssrc,
}

res := struct {
Code int `json:"code"`
Port int `json:"port"`
}{}

if err := srs.ApiRequest(ctx, "http://localhost:1985/gb/v1/publish/", req, &res); err != nil {
return 0, errors.Wrapf(err, "gb/v1/publish")
}

return res.Port, nil
}