diff --git a/transcoder/src/api/streams.go b/transcoder/src/api/streams.go index 16dbd115..33a1b5ee 100644 --- a/transcoder/src/api/streams.go +++ b/transcoder/src/api/streams.go @@ -6,6 +6,7 @@ import ( "strconv" "strings" + "github.com/golang-jwt/jwt/v5" "github.com/labstack/echo/v5" "github.com/zoriya/kyoo/transcoder/src" ) @@ -17,6 +18,7 @@ type shandler struct { func RegisterStreamHandlers(e *echo.Group, transcoder *src.Transcoder) { h := shandler{transcoder} + e.GET("/streams", h.GetStreams) e.GET("/:path/direct", DirectStream) e.GET("/:path/direct/:identifier", DirectStream) e.GET("/:path/master.m3u8", h.GetMaster) @@ -61,12 +63,13 @@ func (h *shandler) GetMaster(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, sha) + ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, profileId, sessionId, sha) if err != nil { return err } @@ -97,12 +100,22 @@ func (h *shandler) GetVideoIndex(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetVideoIndex(c.Request().Context(), path, uint32(video), quality, client, sha) + ret, err := h.transcoder.GetVideoIndex( + c.Request().Context(), + path, + uint32(video), + quality, + client, + profileId, + sessionId, + sha, + ) if err != nil { return err } @@ -133,12 +146,22 @@ func (h *shandler) GetAudioIndex(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetAudioIndex(c.Request().Context(), path, uint32(audio), quality, client, sha) + ret, err := h.transcoder.GetAudioIndex( + c.Request().Context(), + path, + uint32(audio), + quality, + client, + profileId, + sessionId, + sha, + ) if err != nil { return err } @@ -171,6 +194,7 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err @@ -183,6 +207,8 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error { quality, segment, client, + profileId, + sessionId, sha, ) if err != nil { @@ -216,18 +242,33 @@ func (h *shandler) GetAudioSegment(c *echo.Context) error { if err != nil { return err } + profileId, sessionId := getIdentity(c) path, sha, err := getPath(c) if err != nil { return err } - ret, err := h.transcoder.GetAudioSegment(c.Request().Context(), path, uint32(audio), quality, segment, client, sha) + ret, err := h.transcoder.GetAudioSegment( + c.Request().Context(), + path, + uint32(audio), + quality, + segment, + client, + profileId, + sessionId, + sha, + ) if err != nil { return err } return c.File(strings.TrimLeft(ret, "/")) } +func (h *shandler) GetStreams(c *echo.Context) error { + return c.JSON(http.StatusOK, h.transcoder.ListRunningStreams()) +} + func getClientId(c *echo.Context) (string, error) { key := c.QueryParam("clientId") if key == "" { @@ -239,6 +280,29 @@ func getClientId(c *echo.Context) (string, error) { return key, nil } +func getIdentity(c *echo.Context) (*string, *string) { + token, ok := c.Get("user").(*jwt.Token) + if !ok || token == nil { + return nil, nil + } + claims, ok := token.Claims.(jwt.MapClaims) + if !ok { + return nil, nil + } + + profileId := normalizeOptionalId(claims["sub"]) + sessionId := normalizeOptionalId(claims["sid"]) + return profileId, sessionId +} + +func normalizeOptionalId(value any) *string { + id, ok := value.(string) + if !ok || id == "" || id == "00000000-0000-0000-0000-000000000000" { + return nil + } + return &id +} + func parseSegment(segment string) (int32, error) { var ret int32 _, err := fmt.Sscanf(segment, "segment-%d.ts", &ret) diff --git a/transcoder/src/tracker.go b/transcoder/src/tracker.go index a6009b3c..2b48ea94 100644 --- a/transcoder/src/tracker.go +++ b/transcoder/src/tracker.go @@ -7,13 +7,15 @@ import ( ) type ClientInfo struct { - client string - sha string - path string - video *VideoKey - audio *AudioKey - vhead int32 - ahead int32 + client string + profileId *string + sessionId *string + sha string + path string + video *VideoKey + audio *AudioKey + vhead int32 + ahead int32 } type Tracker struct { @@ -25,6 +27,7 @@ type Tracker struct { lastUsage map[string]time.Time transcoder *Transcoder deletedStream chan string + snapshotReq chan chan map[string]ClientInfo } func NewTracker(t *Transcoder) *Tracker { @@ -33,6 +36,7 @@ func NewTracker(t *Transcoder) *Tracker { visitDate: make(map[string]time.Time), lastUsage: make(map[string]time.Time), deletedStream: make(chan string), + snapshotReq: make(chan chan map[string]ClientInfo), transcoder: t, } go ret.start() @@ -59,6 +63,12 @@ func (t *Tracker) start() { old, ok := t.clients[info.client] // First fixup the info. Most routes ruturn partial infos if ok && old.sha == info.sha { + if info.profileId == nil { + info.profileId = old.profileId + } + if info.sessionId == nil { + info.sessionId = old.sessionId + } if info.video == nil { info.video = old.video } @@ -98,6 +108,7 @@ func (t *Tracker) start() { case <-timer: timer = time.After(inactive_time) // Purge old clients + stale := make([]ClientInfo, 0) for client, date := range t.visitDate { if time.Since(date) < inactive_time { continue @@ -106,7 +117,10 @@ func (t *Tracker) start() { info := t.clients[client] delete(t.clients, client) delete(t.visitDate, client) + stale = append(stale, info) + } + for _, info := range stale { if !t.KillStreamIfDead(info.sha, info.path) { audio_cleanup := info.audio != nil && t.KillAudioIfDead(info.sha, info.path, *info.audio) video_cleanup := info.video != nil && t.KillVideoIfDead(info.sha, info.path, *info.video) @@ -117,6 +131,8 @@ func (t *Tracker) start() { } case path := <-t.deletedStream: t.DestroyStreamIfOld(path) + case reply := <-t.snapshotReq: + reply <- t.cloneClients() } } } @@ -143,7 +159,11 @@ func (t *Tracker) KillStreamIfDead(sha string, path string) bool { } func (t *Tracker) DestroyStreamIfOld(sha string) { - if time.Since(t.lastUsage[sha]) < 4*time.Hour { + lastUsage, ok := t.lastUsage[sha] + if !ok { + return + } + if time.Since(lastUsage) < 4*time.Hour { return } stream, ok := t.transcoder.streams.GetAndRemove(sha) @@ -239,3 +259,17 @@ func (t *Tracker) killOrphanedeheads(stream *Stream, is_video bool) { } } } + +func (t *Tracker) SnapshotClients() map[string]ClientInfo { + ret := make(chan map[string]ClientInfo) + t.snapshotReq <- ret + return <-ret +} + +func (t *Tracker) cloneClients() map[string]ClientInfo { + out := make(map[string]ClientInfo, len(t.clients)) + for clientId, info := range t.clients { + out[clientId] = info + } + return out +} diff --git a/transcoder/src/transcode_status.go b/transcoder/src/transcode_status.go new file mode 100644 index 00000000..4d3b9f60 --- /dev/null +++ b/transcoder/src/transcode_status.go @@ -0,0 +1,222 @@ +package src + +import ( + "sort" +) + +type StreamStatus struct { + Path string `json:"path"` + Sha string `json:"sha"` + Duration float64 `json:"duration"` + Videos []TranscodeStatus `json:"videos"` + Audios []TranscodeStatus `json:"audios"` + Viewers []ClientStatus `json:"viewers"` +} + +type TranscodeStatus struct { + Index uint32 `json:"index"` + Quality string `json:"quality"` + Heads []HeadRange `json:"heads"` +} + +type HeadRange struct { + Start float64 `json:"start"` + End float64 `json:"end"` + StartHead int32 `json:"startHead"` + EndHead int32 `json:"endHead"` + IsRunning bool `json:"isRunning"` +} + +type ViewerTrack struct { + Index uint32 `json:"index"` + Quality string `json:"quality"` + Head int32 `json:"head"` +} + +type ClientStatus struct { + ClientId string `json:"clientId"` + ProfileId *string `json:"profileId"` + SessionId *string `json:"sessionId"` + Video *ViewerTrack `json:"video"` + Audio *ViewerTrack `json:"audio"` +} + +func (t *Transcoder) ListRunningStreams() []StreamStatus { + clients := t.tracker.SnapshotClients() + + t.streams.lock.RLock() + streams := make(map[string]*FileStream, len(t.streams.data)) + for sha, stream := range t.streams.data { + streams[sha] = stream + } + t.streams.lock.RUnlock() + + clientBySha := make(map[string][]ClientInfo) + for _, client := range clients { + clientBySha[client.sha] = append(clientBySha[client.sha], client) + } + + ret := make([]StreamStatus, 0, len(streams)) + for sha, stream := range streams { + if stream == nil || stream.Info == nil { + continue + } + + status := StreamStatus{ + Path: stream.Info.Path, + Sha: sha, + Duration: stream.Info.Duration, + Videos: listVideoStatuses(stream), + Audios: listAudioStatuses(stream), + Viewers: listClientStatuses(stream, clientBySha[sha]), + } + if len(status.Videos) == 0 && len(status.Audios) == 0 && len(status.Viewers) == 0 { + continue + } + ret = append(ret, status) + } + + sort.Slice(ret, func(i int, j int) bool { + return ret[i].Path < ret[j].Path + }) + return ret +} + +func listVideoStatuses(stream *FileStream) []TranscodeStatus { + stream.videos.lock.RLock() + ret := make([]TranscodeStatus, 0, len(stream.videos.data)) + for key, video := range stream.videos.data { + if video == nil { + continue + } + ret = append(ret, TranscodeStatus{ + Index: key.idx, + Quality: string(key.quality), + Heads: listHeadRanges(stream, &video.Stream, true, key.idx), + }) + } + stream.videos.lock.RUnlock() + + sort.Slice(ret, func(i int, j int) bool { + if ret[i].Index == ret[j].Index { + return ret[i].Quality < ret[j].Quality + } + return ret[i].Index < ret[j].Index + }) + return ret +} + +func listAudioStatuses(stream *FileStream) []TranscodeStatus { + stream.audios.lock.RLock() + ret := make([]TranscodeStatus, 0, len(stream.audios.data)) + for key, audio := range stream.audios.data { + if audio == nil { + continue + } + ret = append(ret, TranscodeStatus{ + Index: key.idx, + Quality: string(key.quality), + Heads: listHeadRanges(stream, &audio.Stream, false, key.idx), + }) + } + stream.audios.lock.RUnlock() + + sort.Slice(ret, func(i int, j int) bool { + if ret[i].Index == ret[j].Index { + return ret[i].Quality < ret[j].Quality + } + return ret[i].Index < ret[j].Index + }) + return ret +} + +func listClientStatuses(stream *FileStream, clients []ClientInfo) []ClientStatus { + ret := make([]ClientStatus, 0, len(clients)) + for _, client := range clients { + var video *ViewerTrack + if client.video != nil { + video = &ViewerTrack{ + Index: client.video.idx, + Quality: string(client.video.quality), + Head: client.vhead, + } + } + + var audio *ViewerTrack + if client.audio != nil { + audio = &ViewerTrack{ + Index: client.audio.idx, + Quality: string(client.audio.quality), + Head: client.ahead, + } + } + + ret = append(ret, ClientStatus{ + ClientId: client.client, + ProfileId: client.profileId, + SessionId: client.sessionId, + Video: video, + Audio: audio, + }) + } + + sort.Slice(ret, func(i int, j int) bool { + return ret[i].ClientId < ret[j].ClientId + }) + return ret +} + +func listHeadRanges(file *FileStream, stream *Stream, isVideo bool, index uint32) []HeadRange { + stream.lock.RLock() + defer stream.lock.RUnlock() + + ret := make([]HeadRange, 0, len(stream.heads)) + for _, head := range stream.heads { + if head == DeletedHead { + continue + } + + end := stream.file.Info.Duration + length, _ := stream.keyframes.Length() + if head.end <= length { + end = stream.keyframes.Get(head.end) + } + + ret = append(ret, HeadRange{ + Start: stream.keyframes.Get(head.segment), + End: end, + StartHead: head.segment, + EndHead: head.end, + IsRunning: head.command != nil && head.command.ProcessState == nil, + }) + } + + for i := int32(0); i < int32(len(stream.segments)); i++ { + if !stream.isSegmentReady(i) { + continue + } + + start := i + for i < int32(len(stream.segments)) && stream.isSegmentReady(i) { + i++ + } + end := i + + ret = append(ret, HeadRange{ + Start: stream.keyframes.Get(start), + End: stream.keyframes.Get(end), + StartHead: start, + EndHead: end, + IsRunning: false, + }) + i-- + } + + sort.Slice(ret, func(i int, j int) bool { + if ret[i].StartHead == ret[j].StartHead { + return ret[i].EndHead < ret[j].EndHead + } + return ret[i].StartHead < ret[j].StartHead + }) + return ret +} diff --git a/transcoder/src/transcoder.go b/transcoder/src/transcoder.go index 4a0e44e0..034c3b1b 100644 --- a/transcoder/src/transcoder.go +++ b/transcoder/src/transcoder.go @@ -50,20 +50,22 @@ func (t *Transcoder) getFileStream(ctx context.Context, path string, sha string) return ret, nil } -func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, sha string) (string, error) { +func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, profileId *string, sessionId *string, sha string) (string, error) { ctx = context.WithoutCancel(ctx) stream, err := t.getFileStream(ctx, path, sha) if err != nil { return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - video: nil, - audio: nil, - vhead: -1, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + video: nil, + audio: nil, + vhead: -1, + ahead: -1, } return stream.GetMaster(ctx, client), nil } @@ -74,6 +76,8 @@ func (t *Transcoder) GetVideoIndex( video uint32, quality VideoQuality, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -82,13 +86,15 @@ func (t *Transcoder) GetVideoIndex( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - video: &VideoKey{video, quality}, - audio: nil, - vhead: -1, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + video: &VideoKey{video, quality}, + audio: nil, + vhead: -1, + ahead: -1, } return stream.GetVideoIndex(ctx, video, quality, client) } @@ -99,6 +105,8 @@ func (t *Transcoder) GetAudioIndex( audio uint32, quality AudioQuality, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -107,12 +115,14 @@ func (t *Transcoder) GetAudioIndex( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - audio: &AudioKey{audio, quality}, - vhead: -1, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + audio: &AudioKey{audio, quality}, + vhead: -1, + ahead: -1, } return stream.GetAudioIndex(ctx, audio, quality, client) } @@ -124,6 +134,8 @@ func (t *Transcoder) GetVideoSegment( quality VideoQuality, segment int32, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -132,13 +144,15 @@ func (t *Transcoder) GetVideoSegment( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - video: &VideoKey{video, quality}, - vhead: segment, - audio: nil, - ahead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + video: &VideoKey{video, quality}, + vhead: segment, + audio: nil, + ahead: -1, } return stream.GetVideoSegment(ctx, video, quality, segment) } @@ -150,6 +164,8 @@ func (t *Transcoder) GetAudioSegment( quality AudioQuality, segment int32, client string, + profileId *string, + sessionId *string, sha string, ) (string, error) { ctx = context.WithoutCancel(ctx) @@ -158,12 +174,14 @@ func (t *Transcoder) GetAudioSegment( return "", err } t.clientChan <- ClientInfo{ - client: client, - sha: sha, - path: path, - audio: &AudioKey{audio, quality}, - ahead: segment, - vhead: -1, + client: client, + profileId: profileId, + sessionId: sessionId, + sha: sha, + path: path, + audio: &AudioKey{audio, quality}, + ahead: segment, + vhead: -1, } return stream.GetAudioSegment(ctx, audio, quality, segment) }