|
@@ -1282,14 +1282,59 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_
|
|
|
return 0;
|
|
return 0;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+static void unchoke_for_stream(Scheduler *sch, SchedulerNode src);
|
|
|
|
|
+
|
|
|
|
|
+// Unchoke any filter graphs that are downstream of this node, to prevent it
|
|
|
|
|
+// from getting stuck trying to push data to a full queue
|
|
|
|
|
+static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst)
|
|
|
|
|
+{
|
|
|
|
|
+ SchFilterGraph *fg;
|
|
|
|
|
+ SchDec *dec;
|
|
|
|
|
+ SchEnc *enc;
|
|
|
|
|
+ switch (dst->type) {
|
|
|
|
|
+ case SCH_NODE_TYPE_DEC:
|
|
|
|
|
+ dec = &sch->dec[dst->idx];
|
|
|
|
|
+ for (int i = 0; i < dec->nb_outputs; i++)
|
|
|
|
|
+ unchoke_downstream(sch, dec->outputs[i].dst);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case SCH_NODE_TYPE_ENC:
|
|
|
|
|
+ enc = &sch->enc[dst->idx];
|
|
|
|
|
+ for (int i = 0; i < enc->nb_dst; i++)
|
|
|
|
|
+ unchoke_downstream(sch, &enc->dst[i]);
|
|
|
|
|
+ break;
|
|
|
|
|
+ case SCH_NODE_TYPE_MUX:
|
|
|
|
|
+ // muxers are never choked
|
|
|
|
|
+ break;
|
|
|
|
|
+ case SCH_NODE_TYPE_FILTER_IN:
|
|
|
|
|
+ fg = &sch->filters[dst->idx];
|
|
|
|
|
+ if (fg->best_input == fg->nb_inputs) {
|
|
|
|
|
+ fg->waiter.choked_next = 0;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ // ensure that this filter graph is not stuck waiting for
|
|
|
|
|
+ // input from a different upstream demuxer
|
|
|
|
|
+ unchoke_for_stream(sch, fg->inputs[fg->best_input].src);
|
|
|
|
|
+ }
|
|
|
|
|
+ break;
|
|
|
|
|
+ default:
|
|
|
|
|
+ av_unreachable("Invalid destination node type?");
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
|
|
static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
|
|
|
{
|
|
{
|
|
|
while (1) {
|
|
while (1) {
|
|
|
SchFilterGraph *fg;
|
|
SchFilterGraph *fg;
|
|
|
|
|
+ SchDemux *demux;
|
|
|
switch (src.type) {
|
|
switch (src.type) {
|
|
|
case SCH_NODE_TYPE_DEMUX:
|
|
case SCH_NODE_TYPE_DEMUX:
|
|
|
// fed directly by a demuxer (i.e. not through a filtergraph)
|
|
// fed directly by a demuxer (i.e. not through a filtergraph)
|
|
|
- sch->demux[src.idx].waiter.choked_next = 0;
|
|
|
|
|
|
|
+ demux = &sch->demux[src.idx];
|
|
|
|
|
+ if (demux->waiter.choked_next == 0)
|
|
|
|
|
+ return; // prevent infinite loop
|
|
|
|
|
+ demux->waiter.choked_next = 0;
|
|
|
|
|
+ for (int i = 0; i < demux->nb_streams; i++)
|
|
|
|
|
+ unchoke_downstream(sch, demux->streams[i].dst);
|
|
|
return;
|
|
return;
|
|
|
case SCH_NODE_TYPE_DEC:
|
|
case SCH_NODE_TYPE_DEC:
|
|
|
src = sch->dec[src.idx].src;
|
|
src = sch->dec[src.idx].src;
|