Skip to content

Commit

Permalink
Add a TCP proxy for debugging. v6.0.117 (#3958)
Browse files Browse the repository at this point in the history
When debugging the RTMP protocol, we can capture packets using tcpdump
and then replay the pcap file. For example:

```bash
cd ~/git/srs/trunk/3rdparty/srs-bench/pcap
tcpdump -i any -w t.pcap tcp port 1935
go run . -f ./t.pcap -s 127.0.0.1:1935
```

However, sometimes due to poor network conditions between the server and
the client, there may be many retransmitted packets. In such cases,
setting up a transparent TCP proxy that listens on port 1935 and
forwards to port 19350 can be a solution:

```bash
./objs/srs -c conf/origin.conf 
cd 3rdparty/srs-bench/tcpproxy/ && go run main.go
tcpdump -i any -w t.pcap tcp port 19350
```

This approach allows for the implementation of packet dumping,
multipoint replication, or the provision of detailed timestamps and byte
information at the proxy. It enables the collection of debugging
information without the need to modify the server.



---------

`TRANS_BY_GPT4`

---------

Co-authored-by: john <[email protected]>
  • Loading branch information
winlinvip and xiaozhihong committed Mar 19, 2024
1 parent 26f4ab9 commit ce2ce15
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 6 deletions.
20 changes: 15 additions & 5 deletions trunk/3rdparty/srs-bench/pcap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"net"
"os"
"strings"
"time"

"github.com/google/gopacket"
Expand Down Expand Up @@ -62,9 +63,19 @@ func doMain(ctx context.Context) error {
}
defer f.Close()

r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions)
if err != nil {
return errors.Wrapf(err, "new reader")
var source *gopacket.PacketSource
if strings.HasSuffix(filename, ".pcap") {
r, err := pcapgo.NewReader(f)
if err != nil {
return errors.Wrapf(err, "new reader")
}
source = gopacket.NewPacketSource(r, r.LinkType())
} else {
r, err := pcapgo.NewNgReader(f, pcapgo.DefaultNgReaderOptions)
if err != nil {
return errors.Wrapf(err, "new reader")
}
source = gopacket.NewPacketSource(r, r.LinkType())
}

// TODO: FIXME: Should start a goroutine to consume bytes from conn.
Expand All @@ -76,7 +87,6 @@ func doMain(ctx context.Context) error {

var packetNumber uint64
var previousTime *time.Time
source := gopacket.NewPacketSource(r, r.LinkType())
for packet := range source.Packets() {
packetNumber++

Expand All @@ -90,7 +100,7 @@ func doMain(ctx context.Context) error {
if len(payload) == 0 {
continue
}
if tcp.DstPort != 1935 {
if tcp.DstPort != 1935 && tcp.DstPort != 19350 {
continue
}

Expand Down
159 changes: 159 additions & 0 deletions trunk/3rdparty/srs-bench/tcpproxy/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package main

import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"fmt"
"net"
"sync"
"time"
)

func main() {
if err := doMain(); err != nil {
panic(err)
}
}

func doMain() error {
hashID := buildHashID()

listener, err := net.Listen("tcp", ":1935")
if err != nil {
return err
}
trace(hashID, "Listen at %v", listener.Addr())

for {
client, err := listener.Accept()
if err != nil {
return err
}

backend, err := net.Dial("tcp", "localhost:19350")
if err != nil {
return err
}

go serve(client, backend)
}
return nil
}

func serve(client, backend net.Conn) {
defer client.Close()
defer backend.Close()
hashID := buildHashID()
if err := doServe(hashID, client, backend); err != nil {
trace(hashID, "Serve error %v", err)
}
}

func doServe(hashID string, client, backend net.Conn) error {
var wg sync.WaitGroup
var r0 error

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if c, ok := client.(*net.TCPConn); ok {
c.SetNoDelay(true)
}
if c, ok := backend.(*net.TCPConn); ok {
c.SetNoDelay(true)
}

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

for {
buf := make([]byte, 128*1024)
nn, err := client.Read(buf)
if err != nil {
trace(hashID, "Read from client error %v", err)
r0 = err
return
}
if nn == 0 {
trace(hashID, "Read from client EOF")
return
}

_, err = backend.Write(buf[:nn])
if err != nil {
trace(hashID, "Write to RTMP backend error %v", err)
r0 = err
return
}

trace(hashID, "Copy %v bytes to RTMP backend", nn)
}
}()

wg.Add(1)
go func() {
defer wg.Done()
defer cancel()

for {
buf := make([]byte, 128*1024)
nn, err := backend.Read(buf)
if err != nil {
trace(hashID, "Read from RTMP backend error %v", err)
r0 = err
return
}
if nn == 0 {
trace(hashID, "Read from RTMP backend EOF")
return
}

_, err = client.Write(buf[:nn])
if err != nil {
trace(hashID, "Write to client error %v", err)
r0 = err
return
}

trace(hashID, "Copy %v bytes to RTMP client", nn)
}
}()

wg.Add(1)
go func() {
defer wg.Done()

defer client.Close()
defer backend.Close()

<-ctx.Done()
trace(hashID, "Context is done, close the connections")
}()

trace(hashID, "Start proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr())
wg.Wait()
trace(hashID, "Finish proxing client %v over %v to backend %v", client.RemoteAddr(), backend.LocalAddr(), backend.RemoteAddr())

return r0
}

func trace(id, msg string, a ...interface{}) {
fmt.Println(fmt.Sprintf("[%v][%v] %v",
time.Now().Format("2006-01-02 15:04:05.000"), id,
fmt.Sprintf(msg, a...),
))
}

func buildHashID() string {
randomData := make([]byte, 16)
if _, err := rand.Read(randomData); err != nil {
return ""
}

hash := sha256.Sum256(randomData)
return hex.EncodeToString(hash[:])[:6]
}
1 change: 1 addition & 0 deletions trunk/doc/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ The changelog for SRS.
<a name="v6-changes"></a>

## SRS 6.0 Changelog
* v6.0, 2024-03-19, Merge [#3958](https://github.com/ossrs/srs/pull/3958): Add a TCP proxy for debugging. v6.0.117 (#3958)
* v6.0, 2024-03-20, Merge [#3964](https://github.com/ossrs/srs/pull/3964): WebRTC: Add support for A/V only WHEP/WHEP player. v6.0.116 (#3964)
* v6.0, 2024-03-19, Merge [#3990](https://github.com/ossrs/srs/pull/3990): System: Disable feature that obtains versions and check features status. v6.0.115 (#3990)
* v6.0, 2024-03-18, Merge [#3973](https://github.com/ossrs/srs/pull/3973): Typo: Fix some typo for #3973 #3976 #3982. v6.0.114 (#3973)
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/core/srs_core_version6.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@

#define VERSION_MAJOR 6
#define VERSION_MINOR 0
#define VERSION_REVISION 116
#define VERSION_REVISION 117

#endif

0 comments on commit ce2ce15

Please sign in to comment.