Add /videos/streams route to debug transcoder

This commit is contained in:
2026-04-27 16:43:33 +02:00
parent fd19b484ff
commit 60301f272d
4 changed files with 384 additions and 46 deletions
+68 -4
View File
@@ -6,6 +6,7 @@ import (
"strconv"
"strings"
"github.com/golang-jwt/jwt/v5"
"github.com/labstack/echo/v5"
"github.com/zoriya/kyoo/transcoder/src"
)
@@ -17,6 +18,7 @@ type shandler struct {
func RegisterStreamHandlers(e *echo.Group, transcoder *src.Transcoder) {
h := shandler{transcoder}
e.GET("/streams", h.GetStreams)
e.GET("/:path/direct", DirectStream)
e.GET("/:path/direct/:identifier", DirectStream)
e.GET("/:path/master.m3u8", h.GetMaster)
@@ -61,12 +63,13 @@ func (h *shandler) GetMaster(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, sha)
ret, err := h.transcoder.GetMaster(c.Request().Context(), path, client, profileId, sessionId, sha)
if err != nil {
return err
}
@@ -97,12 +100,22 @@ func (h *shandler) GetVideoIndex(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetVideoIndex(c.Request().Context(), path, uint32(video), quality, client, sha)
ret, err := h.transcoder.GetVideoIndex(
c.Request().Context(),
path,
uint32(video),
quality,
client,
profileId,
sessionId,
sha,
)
if err != nil {
return err
}
@@ -133,12 +146,22 @@ func (h *shandler) GetAudioIndex(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetAudioIndex(c.Request().Context(), path, uint32(audio), quality, client, sha)
ret, err := h.transcoder.GetAudioIndex(
c.Request().Context(),
path,
uint32(audio),
quality,
client,
profileId,
sessionId,
sha,
)
if err != nil {
return err
}
@@ -171,6 +194,7 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
@@ -183,6 +207,8 @@ func (h *shandler) GetVideoSegment(c *echo.Context) error {
quality,
segment,
client,
profileId,
sessionId,
sha,
)
if err != nil {
@@ -216,18 +242,33 @@ func (h *shandler) GetAudioSegment(c *echo.Context) error {
if err != nil {
return err
}
profileId, sessionId := getIdentity(c)
path, sha, err := getPath(c)
if err != nil {
return err
}
ret, err := h.transcoder.GetAudioSegment(c.Request().Context(), path, uint32(audio), quality, segment, client, sha)
ret, err := h.transcoder.GetAudioSegment(
c.Request().Context(),
path,
uint32(audio),
quality,
segment,
client,
profileId,
sessionId,
sha,
)
if err != nil {
return err
}
return c.File(strings.TrimLeft(ret, "/"))
}
func (h *shandler) GetStreams(c *echo.Context) error {
return c.JSON(http.StatusOK, h.transcoder.ListRunningStreams())
}
func getClientId(c *echo.Context) (string, error) {
key := c.QueryParam("clientId")
if key == "" {
@@ -239,6 +280,29 @@ func getClientId(c *echo.Context) (string, error) {
return key, nil
}
func getIdentity(c *echo.Context) (*string, *string) {
token, ok := c.Get("user").(*jwt.Token)
if !ok || token == nil {
return nil, nil
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
return nil, nil
}
profileId := normalizeOptionalId(claims["sub"])
sessionId := normalizeOptionalId(claims["sid"])
return profileId, sessionId
}
func normalizeOptionalId(value any) *string {
id, ok := value.(string)
if !ok || id == "" || id == "00000000-0000-0000-0000-000000000000" {
return nil
}
return &id
}
func parseSegment(segment string) (int32, error) {
var ret int32
_, err := fmt.Sscanf(segment, "segment-%d.ts", &ret)
+42 -8
View File
@@ -7,13 +7,15 @@ import (
)
type ClientInfo struct {
client string
sha string
path string
video *VideoKey
audio *AudioKey
vhead int32
ahead int32
client string
profileId *string
sessionId *string
sha string
path string
video *VideoKey
audio *AudioKey
vhead int32
ahead int32
}
type Tracker struct {
@@ -25,6 +27,7 @@ type Tracker struct {
lastUsage map[string]time.Time
transcoder *Transcoder
deletedStream chan string
snapshotReq chan chan map[string]ClientInfo
}
func NewTracker(t *Transcoder) *Tracker {
@@ -33,6 +36,7 @@ func NewTracker(t *Transcoder) *Tracker {
visitDate: make(map[string]time.Time),
lastUsage: make(map[string]time.Time),
deletedStream: make(chan string),
snapshotReq: make(chan chan map[string]ClientInfo),
transcoder: t,
}
go ret.start()
@@ -59,6 +63,12 @@ func (t *Tracker) start() {
old, ok := t.clients[info.client]
// First fixup the info. Most routes ruturn partial infos
if ok && old.sha == info.sha {
if info.profileId == nil {
info.profileId = old.profileId
}
if info.sessionId == nil {
info.sessionId = old.sessionId
}
if info.video == nil {
info.video = old.video
}
@@ -98,6 +108,7 @@ func (t *Tracker) start() {
case <-timer:
timer = time.After(inactive_time)
// Purge old clients
stale := make([]ClientInfo, 0)
for client, date := range t.visitDate {
if time.Since(date) < inactive_time {
continue
@@ -106,7 +117,10 @@ func (t *Tracker) start() {
info := t.clients[client]
delete(t.clients, client)
delete(t.visitDate, client)
stale = append(stale, info)
}
for _, info := range stale {
if !t.KillStreamIfDead(info.sha, info.path) {
audio_cleanup := info.audio != nil && t.KillAudioIfDead(info.sha, info.path, *info.audio)
video_cleanup := info.video != nil && t.KillVideoIfDead(info.sha, info.path, *info.video)
@@ -117,6 +131,8 @@ func (t *Tracker) start() {
}
case path := <-t.deletedStream:
t.DestroyStreamIfOld(path)
case reply := <-t.snapshotReq:
reply <- t.cloneClients()
}
}
}
@@ -143,7 +159,11 @@ func (t *Tracker) KillStreamIfDead(sha string, path string) bool {
}
func (t *Tracker) DestroyStreamIfOld(sha string) {
if time.Since(t.lastUsage[sha]) < 4*time.Hour {
lastUsage, ok := t.lastUsage[sha]
if !ok {
return
}
if time.Since(lastUsage) < 4*time.Hour {
return
}
stream, ok := t.transcoder.streams.GetAndRemove(sha)
@@ -239,3 +259,17 @@ func (t *Tracker) killOrphanedeheads(stream *Stream, is_video bool) {
}
}
}
func (t *Tracker) SnapshotClients() map[string]ClientInfo {
ret := make(chan map[string]ClientInfo)
t.snapshotReq <- ret
return <-ret
}
func (t *Tracker) cloneClients() map[string]ClientInfo {
out := make(map[string]ClientInfo, len(t.clients))
for clientId, info := range t.clients {
out[clientId] = info
}
return out
}
+222
View File
@@ -0,0 +1,222 @@
package src
import (
"sort"
)
type StreamStatus struct {
Path string `json:"path"`
Sha string `json:"sha"`
Duration float64 `json:"duration"`
Videos []TranscodeStatus `json:"videos"`
Audios []TranscodeStatus `json:"audios"`
Viewers []ClientStatus `json:"viewers"`
}
type TranscodeStatus struct {
Index uint32 `json:"index"`
Quality string `json:"quality"`
Heads []HeadRange `json:"heads"`
}
type HeadRange struct {
Start float64 `json:"start"`
End float64 `json:"end"`
StartHead int32 `json:"startHead"`
EndHead int32 `json:"endHead"`
IsRunning bool `json:"isRunning"`
}
type ViewerTrack struct {
Index uint32 `json:"index"`
Quality string `json:"quality"`
Head int32 `json:"head"`
}
type ClientStatus struct {
ClientId string `json:"clientId"`
ProfileId *string `json:"profileId"`
SessionId *string `json:"sessionId"`
Video *ViewerTrack `json:"video"`
Audio *ViewerTrack `json:"audio"`
}
func (t *Transcoder) ListRunningStreams() []StreamStatus {
clients := t.tracker.SnapshotClients()
t.streams.lock.RLock()
streams := make(map[string]*FileStream, len(t.streams.data))
for sha, stream := range t.streams.data {
streams[sha] = stream
}
t.streams.lock.RUnlock()
clientBySha := make(map[string][]ClientInfo)
for _, client := range clients {
clientBySha[client.sha] = append(clientBySha[client.sha], client)
}
ret := make([]StreamStatus, 0, len(streams))
for sha, stream := range streams {
if stream == nil || stream.Info == nil {
continue
}
status := StreamStatus{
Path: stream.Info.Path,
Sha: sha,
Duration: stream.Info.Duration,
Videos: listVideoStatuses(stream),
Audios: listAudioStatuses(stream),
Viewers: listClientStatuses(stream, clientBySha[sha]),
}
if len(status.Videos) == 0 && len(status.Audios) == 0 && len(status.Viewers) == 0 {
continue
}
ret = append(ret, status)
}
sort.Slice(ret, func(i int, j int) bool {
return ret[i].Path < ret[j].Path
})
return ret
}
func listVideoStatuses(stream *FileStream) []TranscodeStatus {
stream.videos.lock.RLock()
ret := make([]TranscodeStatus, 0, len(stream.videos.data))
for key, video := range stream.videos.data {
if video == nil {
continue
}
ret = append(ret, TranscodeStatus{
Index: key.idx,
Quality: string(key.quality),
Heads: listHeadRanges(stream, &video.Stream, true, key.idx),
})
}
stream.videos.lock.RUnlock()
sort.Slice(ret, func(i int, j int) bool {
if ret[i].Index == ret[j].Index {
return ret[i].Quality < ret[j].Quality
}
return ret[i].Index < ret[j].Index
})
return ret
}
func listAudioStatuses(stream *FileStream) []TranscodeStatus {
stream.audios.lock.RLock()
ret := make([]TranscodeStatus, 0, len(stream.audios.data))
for key, audio := range stream.audios.data {
if audio == nil {
continue
}
ret = append(ret, TranscodeStatus{
Index: key.idx,
Quality: string(key.quality),
Heads: listHeadRanges(stream, &audio.Stream, false, key.idx),
})
}
stream.audios.lock.RUnlock()
sort.Slice(ret, func(i int, j int) bool {
if ret[i].Index == ret[j].Index {
return ret[i].Quality < ret[j].Quality
}
return ret[i].Index < ret[j].Index
})
return ret
}
func listClientStatuses(stream *FileStream, clients []ClientInfo) []ClientStatus {
ret := make([]ClientStatus, 0, len(clients))
for _, client := range clients {
var video *ViewerTrack
if client.video != nil {
video = &ViewerTrack{
Index: client.video.idx,
Quality: string(client.video.quality),
Head: client.vhead,
}
}
var audio *ViewerTrack
if client.audio != nil {
audio = &ViewerTrack{
Index: client.audio.idx,
Quality: string(client.audio.quality),
Head: client.ahead,
}
}
ret = append(ret, ClientStatus{
ClientId: client.client,
ProfileId: client.profileId,
SessionId: client.sessionId,
Video: video,
Audio: audio,
})
}
sort.Slice(ret, func(i int, j int) bool {
return ret[i].ClientId < ret[j].ClientId
})
return ret
}
func listHeadRanges(file *FileStream, stream *Stream, isVideo bool, index uint32) []HeadRange {
stream.lock.RLock()
defer stream.lock.RUnlock()
ret := make([]HeadRange, 0, len(stream.heads))
for _, head := range stream.heads {
if head == DeletedHead {
continue
}
end := stream.file.Info.Duration
length, _ := stream.keyframes.Length()
if head.end <= length {
end = stream.keyframes.Get(head.end)
}
ret = append(ret, HeadRange{
Start: stream.keyframes.Get(head.segment),
End: end,
StartHead: head.segment,
EndHead: head.end,
IsRunning: head.command != nil && head.command.ProcessState == nil,
})
}
for i := int32(0); i < int32(len(stream.segments)); i++ {
if !stream.isSegmentReady(i) {
continue
}
start := i
for i < int32(len(stream.segments)) && stream.isSegmentReady(i) {
i++
}
end := i
ret = append(ret, HeadRange{
Start: stream.keyframes.Get(start),
End: stream.keyframes.Get(end),
StartHead: start,
EndHead: end,
IsRunning: false,
})
i--
}
sort.Slice(ret, func(i int, j int) bool {
if ret[i].StartHead == ret[j].StartHead {
return ret[i].EndHead < ret[j].EndHead
}
return ret[i].StartHead < ret[j].StartHead
})
return ret
}
+52 -34
View File
@@ -50,20 +50,22 @@ func (t *Transcoder) getFileStream(ctx context.Context, path string, sha string)
return ret, nil
}
func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, sha string) (string, error) {
func (t *Transcoder) GetMaster(ctx context.Context, path string, client string, profileId *string, sessionId *string, sha string) (string, error) {
ctx = context.WithoutCancel(ctx)
stream, err := t.getFileStream(ctx, path, sha)
if err != nil {
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
video: nil,
audio: nil,
vhead: -1,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
video: nil,
audio: nil,
vhead: -1,
ahead: -1,
}
return stream.GetMaster(ctx, client), nil
}
@@ -74,6 +76,8 @@ func (t *Transcoder) GetVideoIndex(
video uint32,
quality VideoQuality,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -82,13 +86,15 @@ func (t *Transcoder) GetVideoIndex(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
video: &VideoKey{video, quality},
audio: nil,
vhead: -1,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
video: &VideoKey{video, quality},
audio: nil,
vhead: -1,
ahead: -1,
}
return stream.GetVideoIndex(ctx, video, quality, client)
}
@@ -99,6 +105,8 @@ func (t *Transcoder) GetAudioIndex(
audio uint32,
quality AudioQuality,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -107,12 +115,14 @@ func (t *Transcoder) GetAudioIndex(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
vhead: -1,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
vhead: -1,
ahead: -1,
}
return stream.GetAudioIndex(ctx, audio, quality, client)
}
@@ -124,6 +134,8 @@ func (t *Transcoder) GetVideoSegment(
quality VideoQuality,
segment int32,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -132,13 +144,15 @@ func (t *Transcoder) GetVideoSegment(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
video: &VideoKey{video, quality},
vhead: segment,
audio: nil,
ahead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
video: &VideoKey{video, quality},
vhead: segment,
audio: nil,
ahead: -1,
}
return stream.GetVideoSegment(ctx, video, quality, segment)
}
@@ -150,6 +164,8 @@ func (t *Transcoder) GetAudioSegment(
quality AudioQuality,
segment int32,
client string,
profileId *string,
sessionId *string,
sha string,
) (string, error) {
ctx = context.WithoutCancel(ctx)
@@ -158,12 +174,14 @@ func (t *Transcoder) GetAudioSegment(
return "", err
}
t.clientChan <- ClientInfo{
client: client,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
ahead: segment,
vhead: -1,
client: client,
profileId: profileId,
sessionId: sessionId,
sha: sha,
path: path,
audio: &AudioKey{audio, quality},
ahead: segment,
vhead: -1,
}
return stream.GetAudioSegment(ctx, audio, quality, segment)
}