webrtc: Data channel fails to send packets at a frequently speed when works as a rtc server and connected by a browser.

Hi all,

I’m trying to test the bandwidth of the Data Channel based on your example

I didn’t change too many codes except the data channel callbacks. I tried to send as much as possible to the client when a data channel was created (will show in the codes later). And with a controllable delay, I tried send frequency at10Hz, 20Hz, 25Hz, 40Hz, 50Hz…etc

The problem is, when it comes to over than 40Hz, the data received in the browser mismatches the data I’m trying to send. With the send frequency set to 40Hz (65535 bytes per send call), I’ve got the throughput jitter sharply and the browser receives fewer packets than I sent. image The gap increases when the send frequency becomes larger. (Note here in the picture 192.168.1.2 is my webrtc server which runs the codes and sends data)

With the send frequency set to 25Hz (65535 bytes per send call as well), I’ve got the throughput smoothly and the browser receives all the data I sent. image

I’ve got no idea what is going wrong, please if anyone here can help?

P.S. Below are my code snippets if they can be helpful.

I rewrite these codes https://github.com/webrtc-rs/webrtc/blob/20b59b7a40663c7546b105d7cd698ca4655a1fc4/examples/examples/data-channels/data-channels.rs#L117-L155 to the following

    // Register data channel creation handling
    peer_connection.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
        let d_label = d.label().to_owned();
        let d_id = d.id();
        println!("New DataChannel {} {}", d_label, d_id);

        // Register channel opening handling
        Box::pin(async move {
            use tokio::time;
            let d2 = Arc::clone(&d);
            let d_label2 = d_label.clone();
            let d_id2 = d_id;

            d.on_open(Box::new(move || {
                println!("Data channel '{}'-'{}' open.", d_label2, d_id2);
                let mut buf = Vec::with_capacity(65535);
                let mut current = time::Instant::now();
                let mut pkt_num = 0_usize;
                unsafe {
                    buf.set_len(65535);
                }

                Box::pin(async move {
                    while d2.send(&buf.to_vec().into()).await.is_ok() {
                        pkt_num  += 1;
                        time::sleep(time::Duration::from_millis(40)).await;
                        if current.elapsed().as_secs() > 1 {
                            println!("current send {} packets", pkt_num);
                            current = time::Instant::now();
                        }
                    }
                })
            }));

            // Register text message handling
            d.on_message(Box::new(move |_: DataChannelMessage| {
                Box::pin(async move {})
            }));

            d.on_close(Box::new(|| std::process::exit(0)));
        })
    }));

And my Javascript codes are quite simple as shown below

let pc = new RTCPeerConnection({
    iceServers: [
        {
            urls: 'stun:stun.l.google.com:19302'
        }
    ]
})

var recv_bytes = 0;
var pkts_num = 0;
let interval = null;

let sendChannel = pc.createDataChannel('foo')
sendChannel.onclose = () => {
    console.log('sendChannel has closed')
    clearInterval(interval);
}
sendChannel.onopen = () => {
    console.log('sendChannel has opened')
    interval = setInterval(() => {
        console.log("Recv " + pkts_num + " packets. Current rate: " + recv_bytes + " B/s");
        recv_bytes = 0;
    }, 1000);
}
sendChannel.onmessage = e => {
    recv_bytes += e.data.byteLength
    pkts_num += 1
    // sendChannel.send(e.data)
}

pc.oniceconnectionstatechange = e => console.log(pc.iceConnectionState)
pc.onnegotiationneeded = e =>
    pc.createOffer().then(d => {
        pc.setLocalDescription(d)
        fetch('/sdp', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/x-www-form-urlencoded'
            },
            body: "client=" + btoa(JSON.stringify(d))
        }).then(response => response.text().then(response => {
            pc.setRemoteDescription(new RTCSessionDescription(JSON.parse(atob(response))))
        }))
    }).catch(console.log)

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 32 (24 by maintainers)

Commits related to this issue

Most upvoted comments

The three PRs combined have this effect for me:

Current master

Throughput: 58522755 Bytes/s, 893 pkts, 893 loops
Send 89902 pkts
Throughput: 60030060 Bytes/s, 916 pkts, 916 loops
Send 90575 pkts
Throughput: 60161130 Bytes/s, 918 pkts, 918 loops
Send 89910 pkts
Throughput: 59374710 Bytes/s, 906 pkts, 906 loops
Send 89765 pkts
Throughput: 58850430 Bytes/s, 898 pkts, 898 loops

All three PRs combined

Throughput: 103938510 Bytes/s, 1586 pkts, 1586 loops
Send 83505 pkts
Throughput: 139196340 Bytes/s, 2124 pkts, 2124 loops
Send 80997 pkts
Throughput: 136247265 Bytes/s, 2079 pkts, 2079 loops
Send 83435 pkts
Throughput: 136706010 Bytes/s, 2086 pkts, 2086 loops
Send 84264 pkts
Throughput: 140244900 Bytes/s, 2140 pkts, 2140 loops

Still not at pion levels but the next big botlleneck that exists now seems to be the CRC32 impementation.

@HsuJv If you want to try this out: I pushed a branch with the current state of the three PRs merged here: https://github.com/KillingSpark/webrtc/tree/merged

Yup it’s lock contention. Specifically on the Mutex around the InternalAssociation struct.

Occupied time in readloop measures time used to process an SACK and in writeloop it’s gathering packets to send.

Taking time is just measuring the time it took to actually lock the mutex before doing the above operation.

It seems pretty clear to me that the SACK processing is stalled by the lock contention. The solution to this is probably non-trivial. Maybe instead of making the writeloop gather packets to send, the packets should be put into a channel either when they are queued and can be sent immediatly within the rwnd or when a SACK arrives that increases the rwnd?

Writeloop lock occupied: 1431us
readloop lock occupied:  43us
Writeloop lock occupied: 38us
Writeloop lock occupied: 1us
Writeloop lock occupied: 1548us
readloop lock occupied:  46us
Writeloop lock occupied: 17us
Writeloop lock occupied: 0us
Writeloop lock occupied: 1498us
readloop lock occupied:  47us
Writeloop lock occupied: 66us
Writeloop lock occupied: 1us
Writeloop lock occupied: 1412us
readloop lock occupied:  43us
Writeloop lock occupied: 19us
Writeloop lock occupied: 0us
Writeloop lock occupied: 1426us
readloop lock occupied:  43us
Writeloop lock occupied: 19us
Writeloop lock occupied: 1us
Writeloop lock occupied: 1552us
readloop lock occupied:  68us
Writeloop lock occupied: 19us
Writeloop lock occupied: 0us
Writeloop lock occupied: 1641us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  791us
Taking Writeloop lock: 66us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  825us
Taking Writeloop lock: 65us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  870us
Taking Writeloop lock: 68us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  861us
Taking Writeloop lock: 71us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  846us
Taking Writeloop lock: 71us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  863us
Taking Writeloop lock: 68us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  908us
Taking Writeloop lock: 69us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  810us
Taking Writeloop lock: 64us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us
Taking readloop lock:  885us
Taking Writeloop lock: 72us
Taking Writeloop lock: 0us
Taking Writeloop lock: 0us

Modified code for better understanding of the measurements above:

    async fn read_loop(
        name: String,
        bytes_received: Arc<AtomicUsize>,
        net_conn: Arc<dyn Conn + Send + Sync>,
        mut close_loop_ch: broadcast::Receiver<()>,
        association_internal: Arc<Mutex<AssociationInternal>>,
    ) {
        log::debug!("[{}] read_loop entered", name);

        let mut buffer = vec![0u8; RECEIVE_MTU];
        let mut done = false;
        let mut n;
        while !done {
            tokio::select! {
                _ = close_loop_ch.recv() => break,
                result = net_conn.recv(&mut buffer) => {
                    match result {
                        Ok(m) => {
                            n=m;
                        }
                        Err(err) => {
                            log::warn!("[{}] failed to read packets on net_conn: {}", name, err);
                            break;
                        }
                    }
                }
            };

            // Make a buffer sized to what we read, then copy the data we
            // read from the underlying transport. We do this because the
            // user data is passed to the reassembly queue without
            // copying.
            log::debug!("[{}] recving {} bytes", name, n);
            let inbound = Bytes::from(buffer[..n].to_vec());
            bytes_received.fetch_add(n, Ordering::SeqCst);

            {
                let x = std::time::Instant::now();
                let mut ai = association_internal.lock().await;
                if inbound.len() < 1200 {
                    eprintln!("Taking readloop lock:  {}us", x.elapsed().as_micros());
                }

                let x = std::time::Instant::now();
                if let Err(err) = ai.handle_inbound(&inbound).await {
                    log::warn!("[{}] failed to handle_inbound: {:?}", name, err);
                    done = true;
                }
                if inbound.len() < 1200 {
                    eprintln!("readloop lock occupied:  {}us", x.elapsed().as_micros());
                }
            }
        }

        {
            let mut ai = association_internal.lock().await;
            if let Err(err) = ai.close().await {
                log::warn!("[{}] failed to close association: {:?}", name, err);
            }
        }

        log::debug!("[{}] read_loop exited", name);
    }

    async fn write_loop(
        name: String,
        bytes_sent: Arc<AtomicUsize>,
        net_conn: Arc<dyn Conn + Send + Sync>,
        mut close_loop_ch: broadcast::Receiver<()>,
        association_internal: Arc<Mutex<AssociationInternal>>,
        mut awake_write_loop_ch: mpsc::Receiver<()>,
    ) {
        log::debug!("[{}] write_loop entered", name);
        let mut done = false;
        while !done {
            //log::debug!("[{}] gather_outbound begin", name);
            let (raw_packets, mut ok) = {
                let x = std::time::Instant::now();
                let mut ai = association_internal.lock().await;
                eprintln!("Taking Writeloop lock: {}us", x.elapsed().as_micros());

                let x = std::time::Instant::now();
                let r = ai.gather_outbound().await;
                eprintln!("Writeloop lock occupied: {}us", x.elapsed().as_micros());
                r
            };
            //log::debug!("[{}] gather_outbound done with {}", name, raw_packets.len());

            for raw in &raw_packets {
                log::debug!("[{}] sending {} bytes", name, raw.len());
                if let Err(err) = net_conn.send(raw).await {
                    log::warn!("[{}] failed to write packets on net_conn: {}", name, err);
                    ok = false;
                    break;
                } else {
                    bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
                }
                //log::debug!("[{}] sending {} bytes done", name, raw.len());
            }

            if !ok {
                break;
            }

            if raw_packets.is_empty() {
                //log::debug!("[{}] wait awake_write_loop_ch", name);
                tokio::select! {
                    _ = awake_write_loop_ch.recv() =>{}
                    _ = close_loop_ch.recv() => {
                        done = true;
                    }
                };
            }
            //log::debug!("[{}] wait awake_write_loop_ch done", name);
        }

        {
            let mut ai = association_internal.lock().await;
            if let Err(err) = ai.close().await {
                log::warn!("[{}] failed to close association: {:?}", name, err);
            }
        }

        log::debug!("[{}] write_loop exited", name);
    }

Edit: My current suspicion is, that the marshalling of the packets is the culprit, because it is done under while holding the lock. I think it is easy enough to fix this and maybe performance will be good enough without bigger changes to the architecture

TLDR for this whole issue:

  1. The lock around AssociationInternal is contended by the write and read loop
  2. If one of the loops is doing too much work in one iteration it chokes the other one, the write_loop is especially prone of doing this because it naturally batches its work
  3. This can be solved (or at least drastically improved) by pulling the marshalling code up into the write_loop, outside the critical section making the total time spent in the critical section significantly smaller
  4. After this improvement send performance is only being bottlenecked by two functions
    1. Packet::marshal() <- There are some low hanging optimizations here
    2. AssociationInternal::gather_outbound() <- This actually still chokes the read_loop so optimizing this is also important

Separately I noticed that the pending queue does not apply backpressure so if the sender continously sends faster than the connection is allowed to send this queue will grow indefinitely and in the long run cause an OOM.

Way forward

@rainliu @k0nserv First of all sorry for hijacking (and kinda spamming 😄) an issue on your repo I hope that’s ok. I got a bit carried away.

Do you have any objections to the changes I did in the write_loop (see comment directly above) ? Obviously the .unwrap()s need to be handled more gracefully.

If not I’d prepare two PRs one for the changes in the write_loop and one for the optimizations to the marshal code. And probably a new issue for the pending queue not applying back pressure.

Yes it gets smoothly at around 130MB BTW, In my last comment, the number was 20-40 MB & 70 MB. I did a wrong calculation.

accepted a stream
Throughput: 262140 Bytes/s, 4 pkts, 4 loops
Send 15279 pkts
Throughput: 95550030 Bytes/s, 1458 pkts, 1458 loops
Send 1850 pkts
Throughput: 130152510 Bytes/s, 1986 pkts, 1986 loops
Send 1981 pkts
Throughput: 131594280 Bytes/s, 2008 pkts, 2008 loops
Send 2008 pkts
Throughput: 132315165 Bytes/s, 2019 pkts, 2019 loops
Send 2030 pkts
Throughput: 131528745 Bytes/s, 2007 pkts, 2007 loops
Send 1995 pkts
Throughput: 131070000 Bytes/s, 2000 pkts, 2000 loops
Send 1996 pkts
Throughput: 129824835 Bytes/s, 1981 pkts, 1981 loops

(My server always runs some heavy tasks, so it is expected that the performance is lower than my WSL)

Definitely should do that. I have some ideas for that as well (and prototyping shows it increases throughput even more) but I wanted to get these PRs merged before starting another one 😃

Edit: nevermind, found a good solution and couldn’t wait

Excellent research! Sounds like your changes are promising @KillingSpark. Please roll it up into PRs for review

Ok so my suspicion was right. Pulling the marshalling our from under the lock reduces the time the mutex is locked in total drastically…but… Even if we trick tokio into not running the read_loop and write_loop on the same thread we see something unfortunate:

sender Writeloop body took: 7789us for 537836 Bytes   <------- mostly Packet::marshal() + sending over socket
sender Writeloop slept for: 0us                       <------- The write_loop is never ideling (as expected :))
sender Try taking writeloop lock
sender Taking Writeloop lock: 3us
sender Try taking readloop lock on ThreadId(23)
sender Writeloop lock occupied: 615us                 <------- Mostly AssociationInternal::gather_outbound()
sender Run write body on ThreadId(5)                    
sender Taking readloop lock:  837us                   <------- Waiting on write_loop to release lock

537836 Bytes in 7789 microseconds is 69.050.712 Bytes/s and since the write_loop is not ideling, send performance is entirely bottlenecked by:

  1. Packet::marshal()
  2. AssociationInternal::gather_outbound() (which I hope is only this slow because the queue builds up to this ridiculous size?)
  3. sending over socket

I can put together a PR that provides the behaviour above, which would allow optimizations on Packet::marshal to result in immediate throughput benefits

Doing a few low hanging optimizations on the marshaling code gets me to this throughput

Throughput: 95550030 Bytes/s, 1458 pkts, 1458 loops

95MByte/s is still not great but it’s something.

Changes to the write_loop

    async fn write_loop(
        name: String,
        bytes_sent: Arc<AtomicUsize>,
        net_conn: Arc<dyn Conn + Send + Sync>,
        mut close_loop_ch: broadcast::Receiver<()>,
        association_internal: Arc<Mutex<AssociationInternal>>,
        mut awake_write_loop_ch: mpsc::Receiver<()>,
    ) {
        log::debug!("[{}] write_loop entered", name);
        let mut done = false;
        while !done {
            //log::debug!("[{}] gather_outbound begin", name);
            let (raw_packets, mut ok) = {
                let x = std::time::Instant::now();
                eprintln!("{name} Try taking writeloop lock");
                let mut ai = association_internal.lock().await;
                eprintln!(
                    "{name} Taking Writeloop lock: {}us",
                    x.elapsed().as_micros()
                );

                let x = std::time::Instant::now();
                let r = ai.gather_outbound().await;
                drop(ai);
                eprintln!(
                    "{name} Writeloop lock occupied: {}us",
                    x.elapsed().as_micros()
                );
                r
            };

            let x = std::time::Instant::now();
            eprintln!("{name} Run write body on {:?}", std::thread::current().id());

            let name2 = name.clone();
            let net_conn = Arc::clone(&net_conn);
            let bytes_sent = Arc::clone(&bytes_sent);

            // THIS IS IMPORTANT
            // This task::spawn makes tokio spawn this on another thread, allowing the read_loop
            // to make progress while we send out this batch of packets
            let bytes_sent = tokio::task::spawn(async move {
                let mut b = 0;
                for raw in raw_packets {
                    let raw = raw.marshal().unwrap();
                    if let Err(err) = net_conn.send(raw.as_ref()).await {
                        ok = false;
                        break;
                    } else {
                        b += raw.len();
                        bytes_sent.fetch_add(raw.len(), Ordering::SeqCst);
                    }
                    //log::debug!("[{}] sending {} bytes done", name, raw.len());
                }
                b
                //log::debug!("[{}] gather_outbound done with {}", name, raw_packets.len());
            })
            .await
            .unwrap();

            if !ok {
                break;
            }

            eprintln!(
                "{name} Writeloop body took: {}us for {bytes_sent}Bytes",
                x.elapsed().as_micros()
            );
            //log::debug!("[{}] wait awake_write_loop_ch", name);

            let x = std::time::Instant::now();
            tokio::select! {
                _ = awake_write_loop_ch.recv() =>{}
                _ = close_loop_ch.recv() => {
                    done = true;
                }
            };
            eprintln!("{name} Writeloop slept for: {}us", x.elapsed().as_micros());
            //log::debug!("[{}] wait awake_write_loop_ch done", name);
        }

        {
            let mut ai = association_internal.lock().await;
            if let Err(err) = ai.close().await {
                log::warn!("[{}] failed to close association: {:?}", name, err);
            }
        }

        log::debug!("[{}] write_loop exited", name);
    }

Didn’t test the UDP case, but I believe your numbers. The SCTP example you provided is very slow for me too, around 100is packets instead of your 76ish. In a release build I can get up to 900 packets though. That’s still way less than it probably should be, but goes to show how good the optimizer is 😃

A profile I did on a release build shows this:

This stack (37% of total execution time) is concerned with marshalling packets (20% of total execution time) and unmarshaling packets (6% of total execution time) image

This stack (about 29%) is… blocking on a threadpool? Seems like tokio internals but maybe this is influenced by the usage of tokio in this library? image

I performed this with Reliability::Rexmit 0, so the sender performance should not rely on the receiver performance

My logs look like this:

Send 71410 pkts Throughput: 42728820 Bytes/s, 652 pkts, 652 loops Send 68634 pkts Throughput: 41155980 Bytes/s, 628 pkts, 628 loops Send 71338 pkts Throughput: 42794355 Bytes/s, 653 pkts, 653 loops Send 68920 pkts Throughput: 39386535 Bytes/s, 601 pkts, 601 loops Send 68421 pkts Throughput: 41024910 Bytes/s, 626 pkts, 626 loops Send 69504 pkts Throughput: 41549190 Bytes/s, 634 pkts, 634 loops

All in all this suggest to me a few things:

  • The marshalling could maybe be improved, but it taking a lot of CPU time is expected, it is pumping out 70k pkts/s
  • The receiving part is doing SOMETHING wrong. About 99% of the packets seem to be dropped, according to the mismatch between the amount of sent and received packets.
  • Since packet unmarshalling is only using 6% of the total runtime the packets seem to get dropped somewhere before the processing of the data even begins

My speculation is: For some reason tokio does not read packets fast enough from the socket, resulting in the processing code not being called often and causing many packets to be dropped by the kernel because the socket queue is overflowing.

That there is that much time spent on blocking on a threadpool seems fishy and could be related. Not sure though.

Edit:

A quick look into netstat confirms that the receive queue is overflowing. Just gotta figure out why the packets are not retrieved fast enough.

Proto Recv-Q Send-Q  Local Address          Foreign Address        (state)
udp4    1680      0  localhost.28300        localhost.10000

I had a look around in the code and maybe there is just high contention on the reassembly queue lock? That would maybe explain the blocking on parked threads?

It seems something is wrong within the SCTP

I just wrote a simple POC to do a throughput test

use clap::{App, AppSettings, Arg};
use std::io::Write;
use std::sync::Arc;
use tokio::net::UdpSocket;
use util::{conn::conn_disconnected_packet::DisconnectedPacketConn, Conn};
use webrtc_sctp::association::*;
use webrtc_sctp::chunk::chunk_payload_data::PayloadProtocolIdentifier;
use webrtc_sctp::stream::*;
use webrtc_sctp::Error;

fn main() -> Result<(), Error> {
    env_logger::Builder::new()
        .format(|buf, record| {
            writeln!(
                buf,
                "{}:{} [{}] {} - {}",
                record.file().unwrap_or("unknown"),
                record.line().unwrap_or(0),
                record.level(),
                chrono::Local::now().format("%H:%M:%S.%6f"),
                record.args()
            )
        })
        .filter(None, log::LevelFilter::Warn)
        .init();

    let mut app = App::new("SCTP Throughput")
        .version("0.1.0")
        .about("An example of SCTP Server")
        .setting(AppSettings::DeriveDisplayOrder)
        .setting(AppSettings::SubcommandsNegateReqs)
        .arg(
            Arg::with_name("FULLHELP")
                .help("Prints more detailed help information")
                .long("fullhelp"),
        )
        .arg(
            Arg::with_name("port")
                .required_unless("FULLHELP")
                .takes_value(true)
                .long("port")
                .help("use port ."),
        );

    let matches = app.clone().get_matches();

    if matches.is_present("FULLHELP") {
        app.print_long_help().unwrap();
        std::process::exit(0);
    }

    let port1 = matches.value_of("port").unwrap().to_owned();
    let port2 = port1.clone();

    std::thread::spawn(|| {
        tokio::runtime::Runtime::new()
            .unwrap()
            .block_on(async move {
                let conn = DisconnectedPacketConn::new(Arc::new(
                    UdpSocket::bind(format!("127.0.0.1:{}", port1))
                        .await
                        .unwrap(),
                ));
                println!("listening {}...", conn.local_addr().unwrap());

                let config = Config {
                    net_conn: Arc::new(conn),
                    max_receive_buffer_size: 0,
                    max_message_size: 0,
                    name: "server".to_owned(),
                };
                let a = Association::server(config).await?;
                println!("created a server");

                let stream = a.accept_stream().await.unwrap();
                println!("accepted a stream");

                // set unordered = true and 10ms treshold for dropping packets
                stream.set_reliability_params(true, ReliabilityType::Timed, 10);

                let mut buff = vec![0u8; 65535];
                let mut recv = 0;
                let mut pkt_num = 0;
                let mut loop_num = 0;
                let mut now = tokio::time::Instant::now();
                while let Ok(n) = stream.read(&mut buff).await {
                    recv += n;
                    if n != 0 {
                        pkt_num += 1;
                    }
                    loop_num += 1;
                    if now.elapsed().as_secs() == 1 {
                        println!(
                            "Throughput: {} Bytes/s, {} pkts, {} loops",
                            recv, pkt_num, loop_num
                        );
                        now = tokio::time::Instant::now();
                        recv = 0;
                        loop_num = 0;
                        pkt_num = 0;
                    }
                }
                Result::<(), Error>::Ok(())
            })
    });

    std::thread::spawn(|| {
        tokio::runtime::Runtime::new()
            .unwrap()
            .block_on(async move {
                let conn = Arc::new(UdpSocket::bind("0.0.0.0:0").await.unwrap());
                conn.connect(format!("127.0.0.1:{}", port2)).await.unwrap();
                println!("connecting {}..", format!("127.0.0.1:{}", port2));

                let config = Config {
                    net_conn: conn,
                    max_receive_buffer_size: 0,
                    max_message_size: 0,
                    name: "client".to_owned(),
                };
                let a = Association::client(config).await.unwrap();
                println!("created a client");

                let stream = a
                    .open_stream(0, PayloadProtocolIdentifier::Binary)
                    .await
                    .unwrap();
                println!("opened a stream");

                let mut buf = Vec::with_capacity(65535);
                unsafe {
                    buf.set_len(65535);
                }

                let mut now = tokio::time::Instant::now();
                let mut pkt_num = 0;
                while stream.write(&buf.clone().into()).is_ok() {
                    pkt_num += 1;
                    if now.elapsed().as_secs() == 1 {
                        println!("Send {} pkts", pkt_num);
                        now = tokio::time::Instant::now();
                        pkt_num = 0;
                    }
                }
                Result::<(), Error>::Ok(())
            })
    });
    loop {}
}

And got logs like

listening 127.0.0.1:8888...
connecting 127.0.0.1:8888..
created a server
created a client
opened a stream
accepted a stream
Send 20245 pkts
Throughput: 4980660 Bytes/s, 76 pkts
Send 20135 pkts
Throughput: 5046195 Bytes/s, 77 pkts
Send 19996 pkts
Throughput: 5111730 Bytes/s, 78 pkts

whatever the set_reliability_params is true or false

But if I remove the sctp layer and use tokio udp socket directly, everything is ok

Connect to 127.0.0.1:8888
Connected
listening 127.0.0.1:8888
Send 197388 pkts
Throughput: 3233792000 Bytes/s, 197375 pkts
Send 227920 pkts
Throughput: 3734208512 Bytes/s, 227918 pkts
Send 242558 pkts
Throughput: 3973890048 Bytes/s, 242547 pkts
Send 214998 pkts
Throughput: 3522510848 Bytes/s, 214997 pkts
Send 212757 pkts
Throughput: 3485302784 Bytes/s, 212726 pkts