Advanced Namespace Tools blog

02 March 2018

Grid radio with Hubfs

Hubfiles are like pipes - you write bytes into them, clients receive the same bytes when they read from them. Even though hubfs has generally been used for textual shells or more recently an irc-like service, binary data can be buffered and multiplexed to clients as well. Recently on the grid, some users have been making use of this to provide a "grid radio" streaming audio service. Making this work well led to another round of changes to the base hubfs code to support this type of usage.

First experiments with paranoid mode

If you start hubfs with no options, mount it and make a hubfile, start a music decoding program reading from it, and then pump some audio data into it, you get a few seconds of playback and then nothing. Something like:

hubfs -s audiotest
mount -c /srv/audiotest /n/audhub
touch /n/audhub/music
audio/mp3dec </n/audhub/music >/dev/audio
cat foo.mp3 >>/n/audhub/music

A few seconds of sound, then nothing. What went wrong? The default size of a hubfs buffer is just a few hundred kilobytes, chosen to match the amount of text data stored in the backscroll of a rio window. When we dumped the mp3 data into the hub, it "raced around the buffer" several times much faster than the playback read the data. As a result we overwrote the hubfs buffer several times and the reading process only was able to playback for a few seconds. We can improve on this somewhat if we make use of hubfs "paranoid mode" in which the speed of the writing process is limited to match that of reading processes, by checking to see if the write pointer has gotten ahead of the read pointers by a certain amount, and then serving reads preferentially if so. If we alter the above commands by adding:

echo fear >/n/audhub/ctl

Prior to dumping the mp3 data in, then the playback begins working. Whenever the writing process gets ahead of the reading process, hubfs forks internally and sleeps the handling of writes while still answering read requests. This isn't a fully adequate solution for a real radio-type service however, because only the "closest" reader ends up gating the speed of the writer. What we really want to do is rate-limit the writing process to a bitrate which matches that of the audio data, regardless of what readers might be doing.

Rate-limiting micro library

The concept of rate-limiting is a component which can be generally useful to other applications than just hubfs, so I decided to design it as a very small "library" which could be used in other contexts. The limiter has the following parameters: the rate at which it will accept data, the minimum time in between bursts of data, and the interval after which the timers and counts reset.

struct Limiter{
	vlong nspb;	/* Minimum nanoseconds per byte */
	vlong sept;	/* Minimum nanoseconds separating messages */
	vlong startt;	/* Start time to calculate intervals from */
	vlong curt;	/* Current time (ns since epoch) */
	vlong lastt;	/* Timestamp of previous message */
	vlong resett;	/* Time after which to reset limit statistics */
	vlong totalbytes;	/* Total bytes written since start time */
	vlong difft;	/* Checks required minimum vs. actual data timing */
	ulong sleept;	/* Milliseconds of sleep time needed to throttle */
};

We initialize a Limiter by providing those parameters, and receive a pointer to the structure:

Limiter*
startlimit(vlong nsperbyte, vlong nsmingap, vlong nstoreset)
{
	Limiter *limiter;

	limiter=(Limiter*)malloc(sizeof(Limiter));
	if(!limiter)
		sysfatal("out of memory");
	limiter->nspb = nsperbyte;
	limiter->sept = nsmingap;
	limiter->resett = nstoreset;
	limiter->startt = 0;
	limiter->lastt = 0;
	limiter->curt = 0;
	return limiter;
}

To implement limiting, we need to make a call to the limit function whenever a write happens. We call it by providing a pointer to the previously initialized Limiter structure and telling it how many bytes have been written:

void
limit(Limiter *lp, vlong bytes)
{
	lp->curt = nsec();
	lp->totalbytes += bytes;
	/* initialize timers if this is the first message written to a hub */
	if(lp->startt == 0){
		lp->startt = lp->curt;
		lp->lastt = lp->curt;
		return;
	}
	/* check if the message has arrived before the minimum interval */
	if(lp->curt - lp->lastt < lp->sept){
		lp->sleept = (lp->sept - (lp->curt - lp->lastt)) / 1000000;
		sleep(lp->sleept);
		lp->lastt = nsec();
		return;
	}
	/* reset timer if the interval between messages is sufficient */
	if(lp->curt - lp->lastt > lp->resett){
		lp->startt = lp->curt;
		lp->lastt = lp->curt;
		lp->totalbytes = bytes;
		return;
	}
	/* check the required elapsed time vs actual elapsed time */
	lp->difft = (lp->nspb * lp->totalbytes) - (lp->curt - lp->startt);
	if(lp->difft > 1000000){
		lp->sleept = lp->difft / 1000000;
		sleep(lp->sleept);
	}
	lp->lastt = nsec();
}

The crucial timekeeping function is nsec() which tells us how many nanoseconds have elapsed since the unix epoch in 1970. At one billion nanoseconds per second, that is a lot of nanoseconds. Because nanoseconds are a very awkward quantity for humans to work with, some of the new flags to hubfs are expressed in more human friendly terms. Rather than "nanoseconds per byte", hubfs accepts a flag that specifies "bytes per second" and then converts to the specification used by the limiter api, as well as accepting the reset time as defined in seconds. Only the minimum separation interval is specified by the user in nanoseconds.

#define SECOND 1000000000
if(applylimits){
	h->bp = bytespersecond;
	h->st = separationinterval;
	h->rt = resettime;
	h->lp = startlimit(SECOND/h->bp, h->st, h->rt * SECOND);
}

Other changes and radio practicalities

In addition to the ratelimiting, two other changes were made. One is providing an optional parameter for maximum message length in bytes. The primary purpose of this parameter is not audio streaming, but flood protection for the use of hubfs to provide irc-like chat services. The other parameter is changing from a compiled-in static buffer aray per hubfile, to a runtime malloced buffer with a flag to define the size in number of bytes. For audio streaming, it is generally desirable to have several megabytes of buffer rather than just a few hundred kilobytes.

With the ratelimiting code in place, it is necessary to set the desired parameters for a smooth stream. Testing involves doing something like

dd -if foo.mp3 |tput -p |audio/mp3dec >/dev/audio

The correct parameter for the hubfs ratelimiting is just a little bit higher than the observed throughput. For the user-provided "UFO radio" service, the pipeline is something like:

hubfs -q 7000000 -b 20000 -s UFORADIO
mount -c /srv/UFORADIO /n/UFO
touch /n/UFO/radio
audio/flacdec < song.flac|dd -conv swab|audio/mp3enc -V 7>> /n/UFO/radio 

To export this service to the grid, from within the grid namespace, a series of commands is used similar to:

bind -c /n/pubregistry /mnt/registry
myip=this.domain
chmod 666 /srv/UFO
gridlisten1 -t -d UFORADIO -m /n/UFO tcp!*!4458 /bin/exportfs -S /srv/UFORADIO

Clients find the address by catting the /mnt/registry/index file, then srv and mount it, and then just "play /n/UFO/radio". The only issue is for clients located at a significant geographic distance. Because the 9p protocol is rather latency sensitive, clients who are across an ocean from the user providing the stream often need to establish another layer of buffering using either a local hubfs which buffers data from the remote radio, or using another program such as "pump". I like using hubfs in this fashion:

hubfs -q 7000000 -s radiobuffer
mount -c /srv/radiobuffer /n/buffer
cat /n/UFO/radio >/n/buffer/radio &
[wait until about 2 mb are buffered]
play /n/buffer/radio

Once the buffer is consumed and the audio begins to click/pause, I can just interrupt the playback and restart it.

Thanks very much to kiwi for UFO radio!