Categories
Hardware Linux

Capture images from a webcam using ffmpeg

The examples are for Linux and access the web camera through the Video4Linux2 interface. To control web camera settings, use the tool v4l2-ctl. To list connected camera devices, you can use the command: v4l2-ctl --list-devices. On a typical Debian-ish Linux distro, you will also want to add your user to the video and audio groups, so that you can easily access the webcam from a non-desktop session.

Capture to an image file, continually overwriting it with new contents

ffmpeg -y -f v4l2 -video_size 1280x720 -i /dev/video0 \
       -r 0.2 -qscale:v 2 -update 1 /tmp/webcam.jpg
-f v4l2specify input format explicitly as capture from a Video4Linux2 device
-video_size 1280x720specify video frame size from webcam
-i /dev/video0select input device (a UVC-compatible webcam in my case)
-r 0.2set output frame rate to one per 5 seconds
-qscale:v 2set video quality [JPEG quality in this case], 2 is highest quality.
-update 1Image2 muxer option, enable in place update of image file for each video output frame
Options breakdown

Point the output file to a place served by your web server to make your camera image available on the web. The ffmpeg command will run until interrupted or killed.

Add a timestamp to captured images

ffmpeg -y -f v4l2 -video_size 1280x720 -i /dev/video0 \
       -r 0.2 \
       -vf "drawtext=text=%{localtime}:fontcolor=white@1.0:fontsize=26:borderw=1:x=980:y=25" \
       -qscale:v 2 -update 1 /tmp/webcam.jpg

Here we have inserted the drawtext video filter into the processing pipeline. We use its text expansion facilities to simply render the local time onto each video frame with filter-argument text=%{localtime}. It is placed in the top right corner of the image using the x and y arguments.

Running as background job

You can ssh to the host which has the web camera connected, and start the ffmpeg capture process as a background job:

ffmpeg -y -loglevel fatal \
       -f v4l2 -video_size 1280x720 -i /dev/video0 \
       -r 0.2 \
       -vf "drawtext=text=%{localtime}:fontcolor=white@1.0:fontsize=26:borderw=1:x=980:y=25" \
       -qscale:v 2 -update 1 /tmp/webcam.jpg \
       </dev/null &>/tmp/webcam-ffmpeg.log & disown $!

This silences ffmpeg to log only fatal errors, runs it in the background and finally detaches the process from your [bash] shell’s job control, to avoid it being killed if you log out. A more polished solution would be to create a systemd service which controls the ffmpeg webcam capture process, running as a dedicated low privilege system user.

Creating a time lapse video from a bunch of image files

As a sort of bonus chapter on this post, here is how to create a time lapse video from a bunch of captured image files. Assuming you have a directory with JPEG images named in such a way that they sort chronologically by their filenames (padded sequence numbers or timestamps), here’s how you can transform them into a video.

VP9 video in WebM container:

ffmpeg -y -f image2 -pattern_type glob -framerate 30 -i webcam-images/\*.jpg \
       -pix_fmt yuv420p -b 1500k timelapsevid.webm

H264 video in MP4 container:

ffmpeg -y -f image2 -pattern_type glob -framerate 30 -i webcam-images/\*.jpg \
       -pix_fmt yuv420p -b 1500k timelapsevid.mp4
-f image2Input demuxer is Image2, which can read image files.
-pattern_type globInstructs Image2 demuxer to treat input pattern as file name glob.
-framerate 30Set desired framerate; how many images to display per second in the resulting video.
-i webcam-images/\*.jpgSet input to a glob pattern matching the images files you would like to include in the video. Note that we do not want the shell to expand the glob, but rather pass the asterisk verbatim to ffmpeg.
-pix_fmt yuv420pSet video codec pixel format. YUV420p is selected to ensure compatibility with a broad range of decoders/players.
-b 1500kSet desired bitrate of video file.
Options breakdown

Note that all input images should have the same dimensions. Otherwise, you will likely have to add more options to ffmpeg to transform everything to a single suitable video size.

The resulting video files will be suitable for publishing on the web using the <video> tag.

Categories
Code

How to make a shell script log JSON-messages

If you have a shell script running in some environment where logs are expected to be formatted as JSON, it can be cumbersome to ensure all commands in the script output valid single line JSON-formatted messages, instead of just raw lines of text, which is what a shell script commonly does. Here I present a technique which can be used so that the script needs very little modifications to be able to output structured JSON instead of raw text lines.

We will setup a bash script so that its regular output is redirected to a JSON encoder co-process automatically. This is setup at the beginning of the script, and subsequent commands’ output will automatically be wrapped as JSON-messages. It requires the jq command to be present on the system where script runs.

#!/usr/bin/env bash

# 1 Make copies of shell's current stdout and stderr file
# descriptors:
exec 100>&1 200>&2

# 2 Define function which logs arguments or stdin as JSON-message to stdout:
log() {
    if [ "$1" = - ]; then
        jq -Rsc '{"@timestamp": now|strftime("%Y-%m-%dT%H:%M:%S%z"),
                  "message":.}' 1>&100 2>&200
    else
        jq --arg m "$*" -nc '{"@timestamp": now|strftime("%Y-%m-%dT%H:%M:%S%z"),
                              "message":$m}' 1>&100 2>&200
    fi
}

# 3 Start a co-process which transforms input lines to JSON messages:
coproc JSON_LOGGER { jq --unbuffered -Rc \
      '{"@timestamp": now|strftime("%Y-%m-%dT%H:%M:%S%z"),
        "message":.}' 1>&100 2>&200; }

# 4 Finally redirect shell's stdout/stderr to JSON logger
# co-process:
exec 1>&${JSON_LOGGER[1]} 2>&${JSON_LOGGER[1]}

# What follows is whatever you need your script to do

echo Hello brave world
echo '  testing "escaping" and white  space  '
echo >&2 this goes do stderr

uname

# If we want multiple output lines from a single command
# wrapped in a single JSON-message, we need to pipe it to the log function:
curl -sS --head https://api.github.com/|head -n 3|log -

# .. otherwise, each curl output line would become its own
# JSON-encoded message, which may not be desirable.

Output to a terminal with color support should look something like this:

{"@timestamp":"2021-07-01T14:55:15+02:00","message":"Hello brave world"}
{"@timestamp":"2021-07-01T14:55:15+02:00","message":"  testing \"escaping\" and white  space  "}
{"@timestamp":"2021-07-01T14:55:15+02:00","message":"this goes do stderr"}
{"@timestamp":"2021-07-01T14:55:15+02:00","message":"Linux"}
{"@timestamp":"2021-07-01T14:55:16+02:00","message":"HTTP/2 200 \r\nserver: GitHub.com\r\ndate: Thu, 01 Jul 2021 12:55:10 GMT\r"}

Jq will take care of all the necessary escaping and always produce valid single line JSON-structured messages, regardless of message payload.

Notes

  • You could make the above setup reusable, by putting the code in its own file and sourcing it at the beginning of scripts that need it.
  • High precision timestamps cannot be generated natively in jq. If you need millisecond or higher precision on the log event timestamps, there are ways to do it, depending on the shell and command line tools available. If you have bash >= 5, you can modify the log() function so that timestamp string is generated using the expression:
    $(date +%Y-%m-%dT%H:%M:%S).${EPOCHREALTIME#*[.,]}$(date +%z). You’ll need to pass this as an --arg to jq for each invocation and use it in the JSON template. Also, it is a bit harder to accomplish for the log transformer co-process, because ideally we’d like only a single persistent jq process running, for efficiency reasons and no pipeline buffering.
  • You could easily expand the structured log messages by adding JSON fields to the jq templates. For example, you could add a level field, to indicate log level. Or a hostname field using the $HOSTNAME shell variable.
  • Building on the previous point, you could create two separate JSON encoder functions, where one handles stderr messages and logs them at level ERROR, while another one logs regular stdout as level INFO. Then create two co-processes for stdout and stderr, with different jq JSON templates respectively. Finally, redirect main stdout to the first co-process and stderr to the second.
  • For the log transformer co-process, be aware that pipeline buffering can have unfortunate effects, for instance missing the last log events before script exits. This is why jq is invoked with --unbuffered.

Categories
Code

Non-blocking I/O server

.. in Kotlin on the JVM

Recently I did a little experiment to learn more about non-blocking I/O on the JVM, using classes from the java.nio package. This has up until recently been unexplored territory for me, except for occasional use of the java.nio.ByteBuffer and file channel classes. It is interesting to see what kind of performance you can get out of a single thread serving thousands of concurrent clients, and letting the operating system do all the heavy lifting of multiplexing the I/O requests. Traditionally, you would write a server using dedicated per client threads, which all block and wait while communicating with a client, while the main thread is only responsible for accepting new clients and manage the other threads.

This post will go through the main server code and explain in detail, mostly so that I can look back at it later for reference. All the code is available in a Github repository: nioserver.

The service

The experiment implements a server that functions solely as a receiver of messages over TCP, while also storing those messages in memory and allowing other code to fetch them concurrently. It can support any number of clients, and a client can send any number of messages. The messages themselves are variable length UTF-8 encoded strings, and the stream protocol requires each of them to be ended by a single null byte (an end message marker).

The server code

The server is implemented in file NIOServer.kt. It is all written in Kotlin.

class NIOServer: Runnable, AutoCloseable {

The server class implements Runnable to become the target of a dedicated Java thread, while also implementing AutoCloseable to enable Kotlin use { } blocks with server instances.

Constructor and server initialization

  constructor(host: String = "localhost",
              port: Int = 0,
              receiveBufferSize: Int = 32 * 1024,
              messageStoreCapacity: Int = 1024) {

By default, server instances will listen on some free port on localhost. It allows adjusting the per client message reception buffer size, which impacts server memory usage, and one can also adjust the maximum number of decoded messages the server will queue in memory (as String objects).

    recvBufSize = receiveBufferSize
    messages = ArrayBlockingQueue(messageStoreCapacity)
    selector = Selector.open()
    serverSocket = ServerSocketChannel.open().apply {
      bind(InetSocketAddress(host, port))
      configureBlocking(false)
      register(selector, SelectionKey.OP_ACCEPT)
    }

    // Server runs in a single dedicated daemon thread
    Thread(this).apply {
      name = "NIOServer-thread-${getPort()}"
      isDaemon = true
      start()
    }

  }

The rest of the constructor initializes various networking objects. It binds the server socket, opens a java.nio.channels.Selector and registers the server channel for client accept requests, then immediately starts the server daemon thread so that clients can connect. Importantly we configure the java.nio.channels.ServerSocketChannel as non-blocking, which is not the default.

The selector is what enables the single server thread to handle many clients simultaneously in an efficient manner. It allows the server code to react to I/O events from the operating system that are ready to be processed immediately without blocking.

Main event loop

After the server instance is constructed, it will be possible to connect to the port it is bound to. If we follow the server side code flow, we jump to the run() method which the server daemon thread executes:

override fun run() {
  while (selector.isOpen) {
    try {
      selector.select { selectionKey ->
        if (selectionKey.isAcceptable) {
          selectionKey.acceptClient()
        }
        if (selectionKey.isReadable) {
          selectionKey.receive()
        }
      }
    } catch (closed: ClosedSelectorException) {
    } catch (e: IOException) {
      log("error: ${e.javaClass.simpleName}: ${e.message}")
    }
  }
  log("closed")
}

This is the server main event loop, which runs until the server selector is closed. From this point on, the selector is what drives the server code to do useful things. This server can do only two things:

  1. Accept new clients that connect to it.
  2. Receive messages from connected clients.

When either of these two events happen, the selector will make the event available via the select() call, which is where server thread rests when it is idle.

For anything to start happening, a client needs to connect. Initially, only one channel is registered with the selector, and that is the server socket channel which accepts new connections. When a client connects, the selector will signal that the server socket channel is ready for a non-blocking accept of a new client, via its java.nio.channels.SelectionKey. A selection key can be described as a handle specific to a single channel, and from it you can query what kind of non-blocking operations are ready and access the channel instance itself. Notice that the server does not need any data structures of its own to keep track of individual clients, everything is hidden behind the selector and the selection keys it provides.

The server handles client connects and data reads through local Kotlin extension functions on the SelectionKey class, since these are both events which are specific to a single client.

Event driven server code diagram

The following diagram shows how events propagate from network activity through the kernel and JVM, and then how they are serially processed non-blocking by the single server thread as they become ready.

Figure 1: event driven worker thread

Accepting new clients

private fun SelectionKey.acceptClient() {
  val clientChannel = (channel() as ServerSocketChannel).accept()

  clientChannel.configureBlocking(false)

  clientChannel.register(selector,SelectionKey.OP_READ)
   .attach(ByteBuffer.allocate(recvBufSize))
}

The server code for accepting a new client is rather simple. It gains access to the server socket channel through the selection key channel() method, which it uses to actually call accept(). The result of the accept call is a client channel, which can be used to communicate with the client. Here we ensure the client channel is configured as non-blocking, and we then register the channel in our selector for read operations. (This server never writes back to clients.)

In addition to registering the client channel, we attach an allocated ByteBuffer to the returned client SelectionKey instance. Selection key attachments can be any object which your code requires to be associated with a client channel.

Receiving messages from client

After a client has been registered with the server selector, the server can begin reading whatever the client sends.

private fun SelectionKey.receive() {
  val channel = channel() as SocketChannel
  val buffer = attachment() as ByteBuffer

The message reception is handled by the SelectionKey.receive() extension function, which is called whenever data can be received from a client without blocking. Both the client channel and the client specific buffer are accessible via the selection key, as can be seen above. The main job of this function is to read bytes from the client channel, extract and decode messages present in the stream, and store those messages in a queue. Also, when a client disconnects, the receive function will unregister it from the selector and close the channel.

ByteBuffer basics

The ByteBuffer class is typically used for data exchange when reading or writing data to channels. I think of it as a very convenient wrapper around a piece of memory (a byte[] array), although its inner workings are more complex. It also functions as an abstraction around native operating system memory allocation and memory mapped files, but neither of these advanced features are relevant for this project.

We create an initially empty ByteBuffer by using allocate(n), where n is the desired number of bytes the buffer can hold – its capacity(). Initially when empty, this buffer’s position() will be 0, and its limit() will be equal to its capacity(). These concepts are essential to understanding how to use it and the other methods it provides.

Figure 2: initial empty ByteBuffer of size n.

The limit() functions as a marker and is not always equal to the capacity() of the buffer, as we shall see. When writing to the buffer, the limit tells how far you should write bytes, and when reading it tells how far you should read before stopping.

We can write data into the buffer in several different ways, but here we will focus only on perhaps the simplest form: a relative put (write) operation. Assuming we write three byte values into the buffer by invoking put(1) three times, we end up in the following state:

Figure 3: state after having written three bytes into the buffer

Each put() writes a byte into the current position, then advances the position by one. When the position reaches the limit(), no more bytes can be written.

Following a series of writes, the next typical thing to do would be to read those bytes out of the buffer. Again, there are several ways to do this, but we will focus on simple get() calls, reading one byte at a time. Since get() just reads from the current position, we cannot call it immediately after the put() calls, because we would get the byte at position 3, which is just an undefined zero. This is where the flip() method comes into play, which sets the limit to the current position and the position back to zero:

Figure 4: state after flipping the ByteBuffer

Now we can invoke get() three times to read the first three bytes back. For each call, the byte at the current position is returned and the position is advanced by one. The hasRemaining() method can be used to check if there are bytes left to read (or space left to write into) between the position and the limit. It will return true as long as the position is smaller than the limit.

When working with channels, you will often pass around ByteBuffer instances and do bulk read or write calls, meaning that you don’t have to deal with each byte on its own, but rather read/write some amount of bytes (often unknown in advance). This is typically also more efficient. The position, limit and capacity of the ByteBuffer still behave in the exact same way as described in this section.

Lastly, to prepare the buffer for future writes into it, we can clear() it. This would put the buffer back into the following state:

Figure 5: state after the buffer has been cleared

Notice that it is basically like the state in figure 2, except that there are still 1’s written into the first three cells. So a clear is just marker updates and no actual zeroing of memory. Future writes will just overwrite the old garbage bytes, and the position and limit will keep track of the number of valid bytes.

Back to the NIOServer message reception code

Now that we know some basics about the ByteBuffer, let’s go back to the message reception code in NIOServer. We start by reading however many bytes the client channel has to offer us:

  try {
    val read = channel.read(buffer)

We do this by passing our ByteBuffer instance to channel.read(), which actually writes into the buffer. The method returns the number of bytes read, which may be zero, and we store that in variable read. The following figure shows how the buffer may look after a read of 4 bytes:

Figure 6: buffer after reading 4 bytes from client channel

Next we need to scan our buffer for messages. From the example above, the buffer contains one complete message "nio" ended with a null byte. The logic is built to handle zero or more complete messages in the buffer after a single client read.

    var i = 0
    while (i < buffer.position()) {
      if (buffer.get(i) == END_MESSAGE_MARKER) {
        storeMessage(buffer.duplicate().position(i).flip())
        buffer.limit(buffer.position()).position(i + 1).compact()
        i = 0
      } else i += 1
    }

We always start at the beginning of the buffer and scan for the next end message marker byte, but never beyond the buffer position, which tells us where the client channel stopped writing into the buffer. When an end message null byte is found, we first pass a duplicate().position(i).flip() of the client byte buffer to the storeMessage() method. This is only a shallow copy of the buffer, sharing the same memory allocation, but with its own set of markers. We adjust it so the buffer is suitable for reading the exact part that contains the next message, excluding the null byte. The adjusted buffer will have the following state:

Figure 7: state of ephemeral ByteBuffer duplicate used for message decoding

The storeMessage() method simply reads from the provided buffer from position to limit and decodes the bytes as UTF-8, then stores the string message in an internal queue. Next we reset scanning state and remove the decoded message bytes from the client reception buffer, to free up space:

buffer.limit(buffer.position()).position(i + 1).compact()
i = 0

We set limit marker on the current position, which at this stage tells us where the client channel stopped writing into the buffer as a whole. Then position is updated to one beyond the end message null byte marker, and lastly the buffer is compacted. The following figure illustrates the effect of buffer compaction in general:

Figure 8: the effect of the compact() operation

After compaction the buffer is actually setup for more writing from client channel, and we keep only the so far unprocessed bytes, which are copied to the beginning of the buffer. However the scanning loop runs until there are no more complete messages present in the buffer or end of client written bytes is reached.

    if (!buffer.hasRemaining()) {
      log("error: client buffer overflow, too big message, discarding buffer")
      buffer.clear()
    }

If no complete message has been read from the buffer and it is also full, the server enforces a limitation and discards the buffer, for reasons of simplicity. Other ways to handle this could be to dynamically grow the client buffer.

    if (read == -1) {
      if (buffer.position() > 0) {
        storeMessage(buffer.flip())
      }
      cancel()
      channel.close()
    }

Finally, the last part of SelectionKey.receive() code is responsible for closing client channels, which is triggered by the read call returning -1. If there are remaining unprocessed bytes, those are stored as a last message from the client before closing the channel and cancelling the selection key. (I am not sure if it is necessary to do both.)

Thread safety

Since there is only a single thread running in the server code, thread safety is not required. However this server allows other threads to concurrently fetch messages from an in-memory queue and uses java.util.concurrent.ArrayBlockingQueue for this purpose. This allows any number of external threads to safely fetch messages from a single NIOServer instance concurrently with server thread itself is adding new incoming messages to the queue.

Automated tests

Test cases are implemented in the class NIOServerTests.kt. Perhaps the most interesting case is the last one, which spawns 1000 threads concurrently connecting and sending messages, then asserting that the correct number of messages were received by the server.

Notes about performance

As this was only an experiment, not much effort has been put into optimizing the code. The message scanning could be made more efficient by avoiding re-scan from the start of the buffer at every receive(), perhaps by using the mark() method of ByteBuffer to remember the current scan position. Also, the number of buffer compact() operations could be reduced to at most one.

Closing

We will mark the end of this lengthy blog post by showing the code that closes down the server cleanly:

override fun close() {
  selector.close()
  serverSocket.close()
}

When the selector is closed, the server thread stops looping and dies.