Intro
This time, I will try Pion/WebRTC.
Because Pion already has good examples, I will create a sample based on SFU-WebSocket of example-webrtc-applications.
I will try changing these points.
- Use SSE(Server-Sent Events) for signaling
- Start connecting manually
I will add WebRTC functions into the last sample project.
And I also refer this post(especially the client-side).
Environments
- Go ver.go1.18.2 windows/amd64
- Node.js ver.18.1.0
Connecting with WebRTC SFU
When I tried WebRTC last time, the server-side application just worked for signaling.
After signaling, the clients were directly connected to each other.
This time they will only connect to the server-side application.
After connecting, the clients will send video tracks and audio tracks to the server-side application.
And The server-side application send other clients' tracks as remote tracks to the clients.
Samples
This time, I publish the sample project on GitHub.
Client-side
Because the process for connecting starts from the server-side application.
So the client-side just needs handling offer and ICE candidate events.
And I will create a RTCPeerConnection on start.
main.page.ts
...
function handleReceivedMessage(value: string) {
const message = JSON.parse(value);
if(!checkIsClientMessage(message)) {
return;
}
switch(message.event) {
case "text":
view.addReceivedText({ user: message.userName, message: message.data });
break;
case "offer":
webrtc.handleOffer(JSON.parse(message.data));
break;
case "candidate":
webrtc.handleCandidate(JSON.parse(message.data));
break;
}
}
function sendAnswer(data: RTCSessionDescriptionInit) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "answer", data: JSON.stringify(data)});
}
function sendCandidate(data: RTCIceCandidate) {
if(!hasAnyTexts(userName)) {
return;
}
sse.sendMessage({userName, event: "candidate", data: JSON.stringify(data)});
}
function checkIsClientMessage(value: any): value is ClientMessage {
// All messages from the server-side application are sent as "ClientMessage".
if(value == null) {
return false;
}
if(("event" in value &&
typeof value["event"] === "string") === false) {
return false;
}
if(("data" in value &&
typeof value["data"] === "string") === false) {
return false;
}
return true;
}
init();
webrtc.controller.ts
export class WebRtcController {
private webcamStream: MediaStream|null = null;
private peerConnection: RTCPeerConnection|null = null;
private answerSentEvent: ((data: RTCSessionDescriptionInit) => void)|null = null;
private candidateSentEvent: ((data: RTCIceCandidate) => void)|null = null;
public init() {
const localVideo = document.getElementById("local_video") as HTMLVideoElement;
localVideo.addEventListener("canplay", () => {
const width = 320;
const height = localVideo.videoHeight / (localVideo.videoWidth/width);
localVideo.setAttribute("width", width.toString());
localVideo.setAttribute("height", height.toString());
}, false);
navigator.mediaDevices.getUserMedia({ video: true, audio: true })
.then(stream => {
localVideo.srcObject = stream;
localVideo.play();
this.webcamStream = stream;
// create a RTCPeerConnection after getting local MediaStream
this.connect();
})
.catch(err => console.error(`An error occurred: ${err}`));
}
...
/** handle received offer and send answer */
public handleOffer(data: RTCSessionDescription|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.setRemoteDescription(data);
this.peerConnection.createAnswer()
.then(answer => {
if(this.peerConnection != null) {
this.peerConnection.setLocalDescription(answer);
}
if(this.answerSentEvent != null) {
this.answerSentEvent(answer);
}
});
}
/** add ICE Candidate */
public handleCandidate(data: RTCIceCandidate|null|undefined) {
if(this.peerConnection == null ||
data == null) {
return;
}
this.peerConnection.addIceCandidate(data);
}
private connect() {
if(this.webcamStream == null) {
return;
}
this.peerConnection = new RTCPeerConnection({
iceServers: [{
urls: `stun:stun.l.google.com:19302`, // A STUN server
}]
});
// Add remote video tracks as new video elements.
this.peerConnection.ontrack = (ev) => {
if (ev.track.kind === "audio" ||
ev.streams[0] == null) {
return;
}
const remoteVideo = document.createElement("video");
remoteVideo.srcObject = ev.streams[0];
remoteVideo.autoplay = true;
remoteVideo.controls = true;
const videoArea = document.getElementById("remote_video_area") as HTMLElement;
videoArea.appendChild(remoteVideo);
ev.track.onmute = () => {
remoteVideo.play();
};
ev.streams[0].onremovetrack = () => {
if (remoteVideo.parentNode) {
remoteVideo.parentNode.removeChild(remoteVideo);
}
};
};
this.webcamStream.getTracks().forEach(track => {
if(this.peerConnection == null ||
this.webcamStream == null) {
return;
}
this.peerConnection.addTrack(track, this.webcamStream)
});
this.peerConnection.onicecandidate = ev => {
if (ev.candidate == null ||
this.candidateSentEvent == null) {
return;
}
this.candidateSentEvent(ev.candidate);
};
}
}
Server-side
sseClient.go
package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"github.com/pion/webrtc/v3"
)
type SSEClient struct {
candidateFound chan *webrtc.ICECandidate
changeConnectionState chan webrtc.PeerConnectionState
addTrack chan *webrtc.TrackRemote
userName string
w http.ResponseWriter
}
func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
return &SSEClient{
candidateFound: make(chan *webrtc.ICECandidate),
changeConnectionState: make(chan webrtc.PeerConnectionState),
addTrack: make(chan *webrtc.TrackRemote),
userName: userName,
w: w,
}
}
func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
userName, err := getParam(r, "user")
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "The parameters have no names")
return
}
newClient := newSSEClient(userName, w)
ps, err := NewPeerConnectionState(newClient)
if err != nil {
log.Println(err.Error())
fmt.Fprint(w, "Failed connection")
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
hub.register <- ps
// For pushing data to clients, I call "flusher.Flush()"
flusher, _ := w.(http.Flusher)
defer func() {
hub.unregister <- ps
if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
ps.peerConnection.Close()
}
close(newClient.candidateFound)
close(newClient.changeConnectionState)
close(newClient.addTrack)
}()
for {
// handle PeerConnection events and close SSE event.
select {
case candidate := <-newClient.candidateFound:
jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
if err != nil {
log.Println(err.Error())
return
}
fmt.Fprintf(w, "data: %s\n\n", jsonValue)
flusher.Flush()
case track := <-newClient.addTrack:
hub.addTrack <- track
case connectionState := <-newClient.changeConnectionState:
switch connectionState {
case webrtc.PeerConnectionStateFailed:
return
case webrtc.PeerConnectionStateClosed:
return
}
case <-r.Context().Done():
// when "es.close()" is called, this loop operation will be ended.
return
}
}
}
func sendSSEMessage(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
w.Header().Set("Content-Type", "application/json")
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err.Error())
j, _ := json.Marshal(GetFailed("Failed reading values from body"))
w.Write(j)
return
}
message := &ClientMessage{}
err = json.Unmarshal(body, &message)
if err != nil {
log.Println(err.Error())
j, _ := json.Marshal(GetFailed("Failed converting to ClientMessage"))
w.Write(j)
return
}
w.WriteHeader(200)
hub.broadcast <- *message
data, _ := json.Marshal(GetSucceeded())
w.Write(data)
}
peerConnectionState.go
package main
import (
"github.com/pion/webrtc/v3"
)
type PeerConnectionState struct {
peerConnection *webrtc.PeerConnection
client *SSEClient
}
func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{
"stun:stun.l.google.com:19302",
},
},
},
})
if err != nil {
return nil, err
}
for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
if _, err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionRecvonly,
}); err != nil {
return nil, err
}
}
// Add event handlers.
peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
if i == nil {
return
}
client.candidateFound <- i
})
peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
// avoid panic after closing channel
if p == webrtc.PeerConnectionStateClosed {
_, ok := <-client.changeConnectionState
if ok {
client.changeConnectionState <- p
}
return
}
client.changeConnectionState <- p
})
peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
client.addTrack <- t
})
return &PeerConnectionState{
peerConnection: peerConnection,
client: client,
}, nil
}
sseHub.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v3"
)
type SSEHub struct {
clients map[*PeerConnectionState]bool
broadcast chan ClientMessage
register chan *PeerConnectionState
unregister chan *PeerConnectionState
trackLocals map[string]*webrtc.TrackLocalStaticRTP
addTrack chan *webrtc.TrackRemote
}
func newSSEHub() *SSEHub {
return &SSEHub{
clients: make(map[*PeerConnectionState]bool),
broadcast: make(chan ClientMessage),
register: make(chan *PeerConnectionState),
unregister: make(chan *PeerConnectionState),
trackLocals: map[string]*webrtc.TrackLocalStaticRTP{},
addTrack: make(chan *webrtc.TrackRemote),
}
}
func (h *SSEHub) run() {
go func() {
for range time.NewTicker(time.Second * 3).C {
dispatchKeyFrame(h)
}
}()
for {
select {
case client := <-h.register:
h.clients[client] = true
signalPeerConnections(h)
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
signalPeerConnections(h)
}
case track := <-h.addTrack:
trackLocal, err := webrtc.NewTrackLocalStaticRTP(track.Codec().RTPCodecCapability,
track.ID(), track.StreamID())
if err != nil {
log.Println(err.Error())
return
}
h.trackLocals[track.ID()] = trackLocal
signalPeerConnections(h)
go updateTrackValue(h, track)
case message := <-h.broadcast:
handleReceivedMessage(h, message)
}
}
}
func updateTrackValue(h *SSEHub, track *webrtc.TrackRemote) {
defer func() {
delete(h.trackLocals, track.ID())
signalPeerConnections(h)
}()
buf := make([]byte, 1500)
for {
i, _, err := track.Read(buf)
if err != nil {
return
}
if _, err = h.trackLocals[track.ID()].Write(buf[:i]); err != nil {
return
}
}
}
func handleReceivedMessage(h *SSEHub, message ClientMessage) {
switch message.Event {
case TextEvent:
m, _ := json.Marshal(message)
jsonText := string(m)
for client := range h.clients {
flusher, _ := client.client.w.(http.Flusher)
fmt.Fprintf(client.client.w, "data: %s\n\n", jsonText)
flusher.Flush()
}
case CandidateEvent:
candidate := webrtc.ICECandidateInit{}
if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
log.Println(err)
return
}
for pc := range h.clients {
if pc.client.userName == message.UserName {
if err := pc.peerConnection.AddICECandidate(candidate); err != nil {
log.Println(err)
return
}
}
}
case AnswerEvent:
answer := webrtc.SessionDescription{}
if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
log.Println(err)
return
}
for pc := range h.clients {
if pc.client.userName == message.UserName {
if err := pc.peerConnection.SetRemoteDescription(answer); err != nil {
log.Println(err)
return
}
}
}
}
}
func signalPeerConnections(h *SSEHub) {
defer func() {
dispatchKeyFrame(h)
}()
for syncAttempt := 0; ; syncAttempt++ {
if syncAttempt == 25 {
// Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
go func() {
time.Sleep(time.Second * 3)
signalPeerConnections(h)
}()
return
}
// For ignoring errors like below, execute attemptSync multiple times.
// InvalidModificationError: invalid proposed signaling state transition: have-local-offer->SetLocal(offer)->have-local-offer
if !attemptSync(h) {
break
}
}
}
// Share received tracks to all connected peers.
func attemptSync(h *SSEHub) bool {
for ps := range h.clients {
if ps.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
delete(h.clients, ps)
// We modified the slice, start from the beginning
return true
}
existingSenders := map[string]bool{}
for _, sender := range ps.peerConnection.GetSenders() {
if sender.Track() == nil {
continue
}
existingSenders[sender.Track().ID()] = true
if _, ok := h.trackLocals[sender.Track().ID()]; !ok {
if err := ps.peerConnection.RemoveTrack(sender); err != nil {
return true
}
}
}
for _, receiver := range ps.peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}
existingSenders[receiver.Track().ID()] = true
}
for trackID := range h.trackLocals {
if _, ok := existingSenders[trackID]; !ok {
if _, err := ps.peerConnection.AddTrack(h.trackLocals[trackID]); err != nil {
return true
}
}
}
offer, err := ps.peerConnection.CreateOffer(nil)
if err != nil {
return true
}
messageJSON, err := NewOfferMessageJSON(ps.client.userName, offer)
if err != nil {
return true
}
if err = ps.peerConnection.SetLocalDescription(offer); err != nil {
return true
}
flusher, _ := ps.client.w.(http.Flusher)
fmt.Fprintf(ps.client.w, "data: %s\n\n", messageJSON)
flusher.Flush()
}
return false
}
func dispatchKeyFrame(h *SSEHub) {
for ps := range h.clients {
for _, receiver := range ps.peerConnection.GetReceivers() {
if receiver.Track() == nil {
continue
}
_ = ps.peerConnection.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{
MediaSSRC: uint32(receiver.Track().SSRC()),
},
})
}
}
}
Channels
I create channels in SSEClient and SSEHub.
I tried adding some channels into SSEClient to send messages from SSEHub first.
But if I did that, the application hang when I sent text messages after connecting WebRTC.
Because I think the cause is a circular reference, I remove these channels and send messages from SSEHub.