In this section we talk about some techniques for dynamically modifying the pipeline. We are talking specifically about changing the pipeline while it is in the PLAYING state without interrupting the flow.
There are some important things to consider when building dynamic pipelines:
When removing elements from the pipeline, make sure that there is no dataflow on unlinked pads because that will cause a fatal pipeline error. Always block source pads (in push mode) or sink pads (in pull mode) before unlinking pads. See also the section called “Changing elements in a pipeline”.
When adding elements to a pipeline, make sure to put the element into the right state, usually the same state as the parent, before allowing dataflow the element. When an element is newly created, it is in the NULL state and will return an error when it receives data. See also the section called “Changing elements in a pipeline”.
When adding elements to a pipeline, GStreamer will by default set the clock and base-time on the element to the current values of the pipeline. This means that the element will be able to construct the same pipeline running-time as the other elements in the pipeline. This means that sinks will synchronize buffers like the other sinks in the pipeline and that sources produce buffers with a running-time that matches the other sources.
When unlinking elements from an upstream chain, always make sure to flush any queued data in the element by sending an EOS event down the element sink pad(s) and by waiting that the EOS leaves the elements (with an event probe).
If you do not do this, you will lose the data which is buffered by the unlinked element. This can result in a simple frame loss (one or more video frames, several milliseconds of audio). However if you remove a muxer (and in some cases an encoder or similar elements) from the pipeline, you risk getting a corrupted file which could not be played properly, as some relevant metadata (header, seek/index tables, internal sync tags) will not be stored or updated properly.
See also the section called “Changing elements in a pipeline”.
A live source will produce buffers with a running-time of the current running-time in the pipeline.
A pipeline without a live source produces buffers with a running-time starting from 0. Likewise, after a flushing seek, those pipelines reset the running-time back to 0.
The running-time can be changed with
gst_pad_set_offset ()
. It is important to
know the running-time of the elements in the pipeline in order
to maintain synchronization.
Adding elements might change the state of the pipeline. Adding a non-prerolled sink, for example, brings the pipeline back to the prerolling state. Removing a non-prerolled sink, for example, might change the pipeline to PAUSED and PLAYING state.
Adding a live source cancels the preroll stage and put the pipeline to the playing state. Adding a live source or other live elements might also change the latency of a pipeline.
Adding or removing elements to the pipeline might change the clock selection of the pipeline. If the newly added element provides a clock, it might be worth changing the clock in the pipeline to the new clock. If, on the other hand, the element that provides the clock for the pipeline is removed, a new clock has to be selected.
Adding and removing elements might cause upstream or downstream elements to renegotiate caps and or allocators. You don't really need to do anything from the application, plugins largely adapt themself to the new pipeline topology in order to optimize their formats and allocation strategy.
What is important is that when you add, remove or change elements in the pipeline, it is possible that the pipeline needs to negotiate a new format and this can fail. Usually you can fix this by inserting the right converter elements where needed. See also the section called “Changing elements in a pipeline”.
GStreamer offers support for doing about any dynamic pipeline modification but it requires you to know a bit of details before you can do this without causing pipeline errors. In the following sections we will demonstrate a couple of typical use-cases.
In the next example we look at the following chain of elements:
- ----. .----------. .---- - element1 | | element2 | | element3 src -> sink src -> sink - ----' '----------' '---- -
We want to change element2 by element4 while the pipeline is in the PLAYING state. Let's say that element2 is a visualization and that you want to switch the visualization in the pipeline.
We can't just unlink element2's sinkpad from element1's source pad because that would leave element1's source pad unlinked and would cause a streaming error in the pipeline when data is pushed on the source pad. The technique is to block the dataflow from element1's source pad before we change element2 by element4 and then resume dataflow as shown in the following steps:
Block element1's source pad with a blocking pad probe. When the pad is blocked, the probe callback will be called.
Inside the block callback nothing is flowing between element1 and element2 and nothing will flow until unblocked.
Unlink element1 and element2.
Make sure data is flushed out of element2. Some elements might internally keep some data, you need to make sure not to lose data by forcing it out of element2. You can do this by pushing EOS into element2, like this:
Put an event probe on element2's source pad.
Send EOS to element2's sinkpad. This makes sure the all the data inside element2 is forced out.
Wait for the EOS event to appear on element2's source pad. When the EOS is received, drop it and remove the event probe.
Unlink element2 and element3. You can now also remove element2 from the pipeline and set the state to NULL.
Add element4 to the pipeline, if not already added. Link element4 and element3. Link element1 and element4.
Make sure element4 is in the same state as the rest of the elements in the pipeline. It should be at least in the PAUSED state before it can receive buffers and events.
Unblock element1's source pad probe. This will let new data into element4 and continue streaming.
The above algorithm works when the source pad is blocked, i.e. when there is dataflow in the pipeline. If there is no dataflow, there is also no point in changing the element (just yet) so this algorithm can be used in the PAUSED state as well.
Let show you how this works with an example. This example changes the video effect on a simple pipeline every second.
#include <gst/gst.h> static gchar *opt_effects = NULL; #define DEFAULT_EFFECTS "identity,exclusion,navigationtest," \ "agingtv,videoflip,vertigotv,gaussianblur,shagadelictv,edgetv" static GstPad *blockpad; static GstElement *conv_before; static GstElement *conv_after; static GstElement *cur_effect; static GstElement *pipeline; static GQueue effects = G_QUEUE_INIT; static GstPadProbeReturn event_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GMainLoop *loop = user_data; GstElement *next; if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_DATA (info)) != GST_EVENT_EOS) return GST_PAD_PROBE_PASS; gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info)); /* push current effect back into the queue */ g_queue_push_tail (&effects, gst_object_ref (cur_effect)); /* take next effect from the queue */ next = g_queue_pop_head (&effects); if (next == NULL) { GST_DEBUG_OBJECT (pad, "no more effects"); g_main_loop_quit (loop); return GST_PAD_PROBE_DROP; } g_print ("Switching from '%s' to '%s'..\n", GST_OBJECT_NAME (cur_effect), GST_OBJECT_NAME (next)); gst_element_set_state (cur_effect, GST_STATE_NULL); /* remove unlinks automatically */ GST_DEBUG_OBJECT (pipeline, "removing %" GST_PTR_FORMAT, cur_effect); gst_bin_remove (GST_BIN (pipeline), cur_effect); GST_DEBUG_OBJECT (pipeline, "adding %" GST_PTR_FORMAT, next); gst_bin_add (GST_BIN (pipeline), next); GST_DEBUG_OBJECT (pipeline, "linking.."); gst_element_link_many (conv_before, next, conv_after, NULL); gst_element_set_state (next, GST_STATE_PLAYING); cur_effect = next; GST_DEBUG_OBJECT (pipeline, "done"); return GST_PAD_PROBE_DROP; } static GstPadProbeReturn pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) { GstPad *srcpad, *sinkpad; GST_DEBUG_OBJECT (pad, "pad is blocked now"); /* remove the probe first */ gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info)); /* install new probe for EOS */ srcpad = gst_element_get_static_pad (cur_effect, "src"); gst_pad_add_probe (srcpad, GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, event_probe_cb, user_data, NULL); gst_object_unref (srcpad); /* push EOS into the element, the probe will be fired when the * EOS leaves the effect and it has thus drained all of its data */ sinkpad = gst_element_get_static_pad (cur_effect, "sink"); gst_pad_send_event (sinkpad, gst_event_new_eos ()); gst_object_unref (sinkpad); return GST_PAD_PROBE_OK; } static gboolean timeout_cb (gpointer user_data) { gst_pad_add_probe (blockpad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM, pad_probe_cb, user_data, NULL); return TRUE; } static gboolean bus_cb (GstBus * bus, GstMessage * msg, gpointer user_data) { GMainLoop *loop = user_data; switch (GST_MESSAGE_TYPE (msg)) { case GST_MESSAGE_ERROR:{ GError *err = NULL; gchar *dbg; gst_message_parse_error (msg, &err, &dbg); gst_object_default_error (msg->src, err, dbg); g_clear_error (&err); g_free (dbg); g_main_loop_quit (loop); break; } default: break; } return TRUE; } int main (int argc, char **argv) { GOptionEntry options[] = { {"effects", 'e', 0, G_OPTION_ARG_STRING, &opt_effects, "Effects to use (comma-separated list of element names)", NULL}, {NULL} }; GOptionContext *ctx; GError *err = NULL; GMainLoop *loop; GstElement *src, *q1, *q2, *effect, *filter1, *filter2, *sink; gchar **effect_names, **e; ctx = g_option_context_new (""); g_option_context_add_main_entries (ctx, options, NULL); g_option_context_add_group (ctx, gst_init_get_option_group ()); if (!g_option_context_parse (ctx, &argc, &argv, &err)) { g_print ("Error initializing: %s\n", err->message); g_clear_error (&err); g_option_context_free (ctx); return 1; } g_option_context_free (ctx); if (opt_effects != NULL) effect_names = g_strsplit (opt_effects, ",", -1); else effect_names = g_strsplit (DEFAULT_EFFECTS, ",", -1); for (e = effect_names; e != NULL && *e != NULL; ++e) { GstElement *el; el = gst_element_factory_make (*e, NULL); if (el) { g_print ("Adding effect '%s'\n", *e); g_queue_push_tail (&effects, el); } } pipeline = gst_pipeline_new ("pipeline"); src = gst_element_factory_make ("videotestsrc", NULL); g_object_set (src, "is-live", TRUE, NULL); filter1 = gst_element_factory_make ("capsfilter", NULL); gst_util_set_object_arg (G_OBJECT (filter1), "caps", "video/x-raw, width=320, height=240, " "format={ I420, YV12, YUY2, UYVY, AYUV, Y41B, Y42B, " "YVYU, Y444, v210, v216, NV12, NV21, UYVP, A420, YUV9, YVU9, IYU1 }"); q1 = gst_element_factory_make ("queue", NULL); blockpad = gst_element_get_static_pad (q1, "src"); conv_before = gst_element_factory_make ("videoconvert", NULL); effect = g_queue_pop_head (&effects); cur_effect = effect; conv_after = gst_element_factory_make ("videoconvert", NULL); q2 = gst_element_factory_make ("queue", NULL); filter2 = gst_element_factory_make ("capsfilter", NULL); gst_util_set_object_arg (G_OBJECT (filter2), "caps", "video/x-raw, width=320, height=240, " "format={ RGBx, BGRx, xRGB, xBGR, RGBA, BGRA, ARGB, ABGR, RGB, BGR }"); sink = gst_element_factory_make ("ximagesink", NULL); gst_bin_add_many (GST_BIN (pipeline), src, filter1, q1, conv_before, effect, conv_after, q2, sink, NULL); gst_element_link_many (src, filter1, q1, conv_before, effect, conv_after, q2, sink, NULL); gst_element_set_state (pipeline, GST_STATE_PLAYING); loop = g_main_loop_new (NULL, FALSE); gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), bus_cb, loop); g_timeout_add_seconds (1, timeout_cb, loop); g_main_loop_run (loop); gst_element_set_state (pipeline, GST_STATE_NULL); gst_object_unref (pipeline); return 0; }
Note how we added videoconvert elements before and after the effect. This is needed because some elements might operate in different colorspaces than other elements. By inserting the conversion elements you ensure that the right format can be negotiated at any time.