Sinkpads operating in pull-mode, with the sourcepads operating in push-mode (or it has no sourcepads when it is a sink), can start a task that will drive the pipeline data flow. Within this task function, you have random access over all of the sinkpads, and push data over the sourcepads. This can come in useful for several different kinds of elements:
Demuxers, parsers and certain kinds of decoders where data comes in unparsed (such as MPEG-audio or video streams), since those will prefer byte-exact (random) access from their input. If possible, however, such elements should be prepared to operate in push-mode mode, too.
Certain kind of audio outputs, which require control over their input data flow, such as the Jack sound server.
First you need to perform a SCHEDULING query to check if the upstream element(s) support pull-mode scheduling. If that is possible, you can activate the sinkpad in pull-mode. Inside the activate_mode function you can then start the task.
#include "filter.h" #include <string.h> static gboolean gst_my_filter_activate (GstPad * pad, GstObject * parent); static gboolean gst_my_filter_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active); static void gst_my_filter_loop (GstMyFilter * filter); G_DEFINE_TYPE (GstMyFilter, gst_my_filter, GST_TYPE_ELEMENT); static void gst_my_filter_init (GstMyFilter * filter) { [..] gst_pad_set_activate_function (filter->sinkpad, gst_my_filter_activate); gst_pad_set_activatemode_function (filter->sinkpad, gst_my_filter_activate_mode); [..] } [..] static gboolean gst_my_filter_activate (GstPad * pad, GstObject * parent) { GstQuery *query; gboolean pull_mode; /* first check what upstream scheduling is supported */ query = gst_query_new_scheduling (); if (!gst_pad_peer_query (pad, query)) { gst_query_unref (query); goto activate_push; } /* see if pull-mode is supported */ pull_mode = gst_query_has_scheduling_mode_with_flags (query, GST_PAD_MODE_PULL, GST_SCHEDULING_FLAG_SEEKABLE); gst_query_unref (query); if (!pull_mode) goto activate_push; /* now we can activate in pull-mode. GStreamer will also * activate the upstream peer in pull-mode */ return gst_pad_activate_mode (pad, GST_PAD_MODE_PULL, TRUE); activate_push: { /* something not right, we fallback to push-mode */ return gst_pad_activate_mode (pad, GST_PAD_MODE_PUSH, TRUE); } } static gboolean gst_my_filter_activate_pull (GstPad * pad, GstObject * parent, GstPadMode mode, gboolean active) { gboolean res; GstMyFilter *filter = GST_MY_FILTER (parent); switch (mode) { case GST_PAD_MODE_PUSH: res = TRUE; break; case GST_PAD_MODE_PULL: if (active) { filter->offset = 0; res = gst_pad_start_task (pad, (GstTaskFunction) gst_my_filter_loop, filter, NULL); } else { res = gst_pad_stop_task (pad); } break; default: /* unknown scheduling mode */ res = FALSE; break; } return res; }
Once started, your task has full control over input and output. The most simple case of a task function is one that reads input and pushes that over its source pad. It's not all that useful, but provides some more flexibility than the old push-mode case that we've been looking at so far.
#define BLOCKSIZE 2048 static void gst_my_filter_loop (GstMyFilter * filter) { GstFlowReturn ret; guint64 len; GstFormat fmt = GST_FORMAT_BYTES; GstBuffer *buf = NULL; if (!gst_pad_query_duration (filter->sinkpad, fmt, &len)) { GST_DEBUG_OBJECT (filter, "failed to query duration, pausing"); goto stop; } if (filter->offset >= len) { GST_DEBUG_OBJECT (filter, "at end of input, sending EOS, pausing"); gst_pad_push_event (filter->srcpad, gst_event_new_eos ()); goto stop; } /* now, read BLOCKSIZE bytes from byte offset filter->offset */ ret = gst_pad_pull_range (filter->sinkpad, filter->offset, BLOCKSIZE, &buf); if (ret != GST_FLOW_OK) { GST_DEBUG_OBJECT (filter, "pull_range failed: %s", gst_flow_get_name (ret)); goto stop; } /* now push buffer downstream */ ret = gst_pad_push (filter->srcpad, buf); buf = NULL; /* gst_pad_push() took ownership of buffer */ if (ret != GST_FLOW_OK) { GST_DEBUG_OBJECT (filter, "pad_push failed: %s", gst_flow_get_name (ret)); goto stop; } /* everything is fine, increase offset and wait for us to be called again */ filter->offset += BLOCKSIZE; return; stop: GST_DEBUG_OBJECT (filter, "pausing task"); gst_pad_pause_task (filter->sinkpad); }