Categories
Code Linux

Locking critical sections in shell scripts

A critical section is some piece of code which, due to its nature and effects, should be executed by at most one thread or process at a time. If such code is executed concurrently, the results often become undefined and arbitrary. Atomically locking a shared resource is a common pattern to synchronize execution of critical sections and ensure mutually exclusive access.

Shell scripts mostly deal in processes and files, and there are several common scenarios where code is actually a critical section. If such code is run in several processes concurrently, it could introduce race conditions and arbitrary results. Consider a script that starts a background process if it is not already running – a typical pattern to start a singleton daemon process. Such code is a critical section, because you can end up with two running daemons if the code runs concurrently (and the daemon does not check for other instances of itself). Another good example is multiple scripts writing to a shared file, or even a shared directory structure.

There are a few strategies to implement locking in shell scripts, some are better than others. In this post, I will focus on one of the most robust ways: using flock(1). This nice tool gives you access to kernel level file locking from you shell. It has some clear advantages over traditional existence based file locking:

  1. It is truly atomic.
  2. The kernel manages the locks and releases them automatically when lock owning processes die. So no more stale lock files to clean up.
  3. You can block and wait for lock, indefinitely or with a timeout, and instantly get it when another process frees it. (Avoid lock polling loops with sleeping.)

It is important to understand that file locks are tied to both a file on the file system and running processes with open file descriptors to it. Even if a file used for locking exists on the file system, it does not mean the lock is taken ! The file system acts as a namespace of shared resources on which we can attach locks. Also note that the locks are advisory only – if a process does not care to check for locks, it will not participate in any synchronization and can do whatever it pleases.

Shell script with locking functions

We will look at a script which needs to protect a critical section with locking. The locking shall be done on a common file job.lock, which means any process with access to that file can obtain or check for a lock. To code along, you can copy the script to your own file and run the examples.

job.sh

#!/bin/bash

lock_acquire() {
    # Open a file descriptor to lock file
    exec {LOCKFD}>job.lock || return 1

    # Block until an exclusive lock can be obtained on the file descriptor
    flock -x $LOCKFD
}

lock_release() {
    test "$LOCKFD" || return 1
    
    # Close lock file descriptor, thereby releasing exclusive lock
    exec {LOCKFD}>&- && unset LOCKFD
}

lock_acquire || { echo >&2 "Error: failed to acquire lock"; exit 1; }

# --- Begin critical section ---

if [ -f job.dat ]; then
    value=$(<job.dat)
else
    value=0
fi
value=$((value + 1))

echo $value >job.dat

# --- End critical section ---

lock_release

The lock_acquire function uses flock -x N to obtain an exlusive lock on a file descriptor N. Since the file descriptor is opened by the script process itself, it will be the owner of the lock after flock exits. Flock is able to lock the descriptor because it is inherited from the shell process that started it. The critical section reads a number from a file if it exists, increments it by one, and writes the updated number back to the file.

Testing

First we’ll run the job script once:

$ bash job.sh 
$ ls
job.dat  job.lock  job.sh
$ cat job.dat 
1

A job.dat file is produced with a value of 1, which is entirely expected and not very interesting.

Next we’ll start 100 job processes asynchronously as fast as possible in the background, which means that many of them will run concurrently. We do this two times:

$ rm job.dat 
$ (for i in {1..100}; do bash job.sh & done; wait)
$ cat job.dat 
100
$ (for i in {1..100}; do bash job.sh & done; wait)
$ cat job.dat 
200

The for loop is started in a sub shell to avoid job control messages. The data has been incremented exactly 100 times after the first run, and incremented again by 100 after the second. If you look at the code in the critical section, it both reads, updates and then writes to the shared file, and doing this without locking would not work consistently.

Actually, let us try that, by commenting out the lock_acquire call in the script:

[...]

#lock_acquire || { echo >&2 "Error: failed to acquire lock"; exit 1; }

# --- Begin critical section ---

Then we run the test again:

$ rm job.dat
$ (for i in {1..100}; do bash job.sh & done; wait)
$ cat job.dat
3
$ (for i in {1..100}; do bash job.sh & done; wait)
$ cat job.dat
14

Which ends with a final result of 14, clearly incorrect and arbitrary. The results will vary with each run and depend on things like the speed of your computer.

Releasing the lock ?

In this case the script actually does not need to release the lock right before it exits, because the kernel will automatically do that when the process exits anyway. We will try it by re-enabling the locking call, and commenting out the lock_release call:

[...]

lock_acquire || { echo >&2 "Error: failed to acquire lock"; exit 1; }

# --- Begin critical section ---
[...]
# --- End critical section ---

#lock_release

And run the test:

$ rm job.dat
$ (for i in {1..100}; do bash job.sh & done; wait)
$ (for i in {1..100}; do bash job.sh & done; wait)
$ cat job.dat
200

It still works fine.

Starting a daemon process from your shell init scripts

A common use case is the need to start a single daemon process from your shell init scripts, unless one is already running. So you only ever want one such thing running. Consider the following:

if ! ps -ef|grep some-daemon|grep -qv grep; then
    some-daemon & pid=$!
    echo Started some-daemon with pid $pid
fi

This code is racy unless it is protected by locking. If you were to start two terminals more or less simultaneously, both executing your shell init scripts, you could possibly end up with two running daemon processes.

To protect this with flock, you could do the following:

if exec {bashrc_fd}<~/.bashrc && flock -nx $bashrc_fd; then
    # --- Begin critical section ---
    if ! ps -ef|grep some-daemon|grep -qv grep; then
        some-daemon & pid=$!
        echo Started some-daemon with pid $pid
    fi
    # --- End critical section ---

    flock -u $bashrc_fd && exec {bashrc_fd}>&-
fi

Here we open a read-only file descriptor to ~/.bashrc and then try to grab an exclusive lock on it, but we do it non-blocking with option -n. If some other bash process is already executing that part of the init file, flock will not succeed and immediately exit with a non-zero code, so the block is skipped. It has the effect that only one bash process will execute the code, and others running at the same time will skip it.

You may notice that we explicitly release the lock using flock -u $bashrc_fd after the critical section. Normally it is enough to close the file descriptor used for locking, but when starting child processes, those may inherit and keep such descriptors open. So the parent process closing its copy of the descriptor may not be enough actually release lock. Therefore we do it explicitly.

Closing notes

The manual page for the flock command lists a few good examples of how you can use it your scripts. However, none of those examples show how you can make the current shell process own and control the locks without using sub shells or flock invocations to wrap critical sections/commands.

The manual page for the flock(2) system call is a good read if you are interested in more details about how it works.

Read more about handling file descriptors with Bash in this part of the bash(1) manual.

Categories
Code

How to make a shell script log JSON-messages

If you have a shell script running in some environment where logs are expected to be formatted as JSON, it can be cumbersome to ensure all commands in the script output valid single line JSON-formatted messages, instead of just raw lines of text, which is what a shell script commonly does. Here I present a technique which can be used so that the script needs very little modifications to be able to output structured JSON instead of raw text lines.

We will setup a bash script so that its regular output is redirected to a JSON encoder co-process automatically. This is setup at the beginning of the script, and subsequent commands’ output will automatically be wrapped as JSON-messages. It requires the jq command to be present on the system where script runs.

#!/usr/bin/env bash

# 1 Make copies of shell's current stdout and stderr file
# descriptors:
exec 100>&1 200>&2

# 2 Define function which logs arguments or stdin as JSON-message to stdout:
log() {
    if [ "$1" = - ]; then
        jq -Rsc '{"@timestamp": now|strftime("%Y-%m-%dT%H:%M:%S%z"),
                  "message":.}' 1>&100 2>&200
    else
        jq --arg m "$*" -nc '{"@timestamp": now|strftime("%Y-%m-%dT%H:%M:%S%z"),
                              "message":$m}' 1>&100 2>&200
    fi
}

# 3 Start a co-process which transforms input lines to JSON messages:
coproc JSON_LOGGER { jq --unbuffered -Rc \
      '{"@timestamp": now|strftime("%Y-%m-%dT%H:%M:%S%z"),
        "message":.}' 1>&100 2>&200; }

# 4 Finally redirect shell's stdout/stderr to JSON logger
# co-process:
exec 1>&${JSON_LOGGER[1]} 2>&${JSON_LOGGER[1]}

# What follows is whatever you need your script to do

echo Hello brave world
echo '  testing "escaping" and white  space  '
echo >&2 this goes do stderr

uname

# If we want multiple output lines from a single command
# wrapped in a single JSON-message, we need to pipe it to the log function:
curl -sS --head https://api.github.com/|head -n 3|log -

# .. otherwise, each curl output line would become its own
# JSON-encoded message, which may not be desirable.

Output to a terminal with color support should look something like this:

{"@timestamp":"2021-07-01T14:55:15+02:00","message":"Hello brave world"}
{"@timestamp":"2021-07-01T14:55:15+02:00","message":"  testing \"escaping\" and white  space  "}
{"@timestamp":"2021-07-01T14:55:15+02:00","message":"this goes do stderr"}
{"@timestamp":"2021-07-01T14:55:15+02:00","message":"Linux"}
{"@timestamp":"2021-07-01T14:55:16+02:00","message":"HTTP/2 200 \r\nserver: GitHub.com\r\ndate: Thu, 01 Jul 2021 12:55:10 GMT\r"}

Jq will take care of all the necessary escaping and always produce valid single line JSON-structured messages, regardless of message payload.

Notes

  • You could make the above setup reusable, by putting the code in its own file and sourcing it at the beginning of scripts that need it.
  • High precision timestamps cannot be generated natively in jq. If you need millisecond or higher precision on the log event timestamps, there are ways to do it, depending on the shell and command line tools available. If you have bash >= 5, you can modify the log() function so that timestamp string is generated using the expression:
    $(date +%Y-%m-%dT%H:%M:%S).${EPOCHREALTIME#*[.,]}$(date +%z). You’ll need to pass this as an --arg to jq for each invocation and use it in the JSON template. Also, it is a bit harder to accomplish for the log transformer co-process, because ideally we’d like only a single persistent jq process running, for efficiency reasons and no pipeline buffering.
  • You could easily expand the structured log messages by adding JSON fields to the jq templates. For example, you could add a level field, to indicate log level. Or a hostname field using the $HOSTNAME shell variable.
  • Building on the previous point, you could create two separate JSON encoder functions, where one handles stderr messages and logs them at level ERROR, while another one logs regular stdout as level INFO. Then create two co-processes for stdout and stderr, with different jq JSON templates respectively. Finally, redirect main stdout to the first co-process and stderr to the second.
  • For the log transformer co-process, be aware that pipeline buffering can have unfortunate effects, for instance missing the last log events before script exits. This is why jq is invoked with --unbuffered.

Categories
Code

Non-blocking I/O server

.. in Kotlin on the JVM

Recently I did a little experiment to learn more about non-blocking I/O on the JVM, using classes from the java.nio package. This has up until recently been unexplored territory for me, except for occasional use of the java.nio.ByteBuffer and file channel classes. It is interesting to see what kind of performance you can get out of a single thread serving thousands of concurrent clients, and letting the operating system do all the heavy lifting of multiplexing the I/O requests. Traditionally, you would write a server using dedicated per client threads, which all block and wait while communicating with a client, while the main thread is only responsible for accepting new clients and manage the other threads.

This post will go through the main server code and explain in detail, mostly so that I can look back at it later for reference. All the code is available in a Github repository: nioserver.

The service

The experiment implements a server that functions solely as a receiver of messages over TCP, while also storing those messages in memory and allowing other code to fetch them concurrently. It can support any number of clients, and a client can send any number of messages. The messages themselves are variable length UTF-8 encoded strings, and the stream protocol requires each of them to be ended by a single null byte (an end message marker).

The server code

The server is implemented in file NIOServer.kt. It is all written in Kotlin.

class NIOServer: Runnable, AutoCloseable {

The server class implements Runnable to become the target of a dedicated Java thread, while also implementing AutoCloseable to enable Kotlin use { } blocks with server instances.

Constructor and server initialization

  constructor(host: String = "localhost",
              port: Int = 0,
              receiveBufferSize: Int = 32 * 1024,
              messageStoreCapacity: Int = 1024) {

By default, server instances will listen on some free port on localhost. It allows adjusting the per client message reception buffer size, which impacts server memory usage, and one can also adjust the maximum number of decoded messages the server will queue in memory (as String objects).

    recvBufSize = receiveBufferSize
    messages = ArrayBlockingQueue(messageStoreCapacity)
    selector = Selector.open()
    serverSocket = ServerSocketChannel.open().apply {
      bind(InetSocketAddress(host, port))
      configureBlocking(false)
      register(selector, SelectionKey.OP_ACCEPT)
    }

    // Server runs in a single dedicated daemon thread
    Thread(this).apply {
      name = "NIOServer-thread-${getPort()}"
      isDaemon = true
      start()
    }

  }

The rest of the constructor initializes various networking objects. It binds the server socket, opens a java.nio.channels.Selector and registers the server channel for client accept requests, then immediately starts the server daemon thread so that clients can connect. Importantly we configure the java.nio.channels.ServerSocketChannel as non-blocking, which is not the default.

The selector is what enables the single server thread to handle many clients simultaneously in an efficient manner. It allows the server code to react to I/O events from the operating system that are ready to be processed immediately without blocking.

Main event loop

After the server instance is constructed, it will be possible to connect to the port it is bound to. If we follow the server side code flow, we jump to the run() method which the server daemon thread executes:

override fun run() {
  while (selector.isOpen) {
    try {
      selector.select { selectionKey ->
        if (selectionKey.isAcceptable) {
          selectionKey.acceptClient()
        }
        if (selectionKey.isReadable) {
          selectionKey.receive()
        }
      }
    } catch (closed: ClosedSelectorException) {
    } catch (e: IOException) {
      log("error: ${e.javaClass.simpleName}: ${e.message}")
    }
  }
  log("closed")
}

This is the server main event loop, which runs until the server selector is closed. From this point on, the selector is what drives the server code to do useful things. This server can do only two things:

  1. Accept new clients that connect to it.
  2. Receive messages from connected clients.

When either of these two events happen, the selector will make the event available via the select() call, which is where server thread rests when it is idle.

For anything to start happening, a client needs to connect. Initially, only one channel is registered with the selector, and that is the server socket channel which accepts new connections. When a client connects, the selector will signal that the server socket channel is ready for a non-blocking accept of a new client, via its java.nio.channels.SelectionKey. A selection key can be described as a handle specific to a single channel, and from it you can query what kind of non-blocking operations are ready and access the channel instance itself. Notice that the server does not need any data structures of its own to keep track of individual clients, everything is hidden behind the selector and the selection keys it provides.

The server handles client connects and data reads through local Kotlin extension functions on the SelectionKey class, since these are both events which are specific to a single client.

Event driven server code diagram

The following diagram shows how events propagate from network activity through the kernel and JVM, and then how they are serially processed non-blocking by the single server thread as they become ready.

Figure 1: event driven worker thread

Accepting new clients

private fun SelectionKey.acceptClient() {
  val clientChannel = (channel() as ServerSocketChannel).accept()

  clientChannel.configureBlocking(false)

  clientChannel.register(selector,SelectionKey.OP_READ)
   .attach(ByteBuffer.allocate(recvBufSize))
}

The server code for accepting a new client is rather simple. It gains access to the server socket channel through the selection key channel() method, which it uses to actually call accept(). The result of the accept call is a client channel, which can be used to communicate with the client. Here we ensure the client channel is configured as non-blocking, and we then register the channel in our selector for read operations. (This server never writes back to clients.)

In addition to registering the client channel, we attach an allocated ByteBuffer to the returned client SelectionKey instance. Selection key attachments can be any object which your code requires to be associated with a client channel.

Receiving messages from client

After a client has been registered with the server selector, the server can begin reading whatever the client sends.

private fun SelectionKey.receive() {
  val channel = channel() as SocketChannel
  val buffer = attachment() as ByteBuffer

The message reception is handled by the SelectionKey.receive() extension function, which is called whenever data can be received from a client without blocking. Both the client channel and the client specific buffer are accessible via the selection key, as can be seen above. The main job of this function is to read bytes from the client channel, extract and decode messages present in the stream, and store those messages in a queue. Also, when a client disconnects, the receive function will unregister it from the selector and close the channel.

ByteBuffer basics

The ByteBuffer class is typically used for data exchange when reading or writing data to channels. I think of it as a very convenient wrapper around a piece of memory (a byte[] array), although its inner workings are more complex. It also functions as an abstraction around native operating system memory allocation and memory mapped files, but neither of these advanced features are relevant for this project.

We create an initially empty ByteBuffer by using allocate(n), where n is the desired number of bytes the buffer can hold – its capacity(). Initially when empty, this buffer’s position() will be 0, and its limit() will be equal to its capacity(). These concepts are essential to understanding how to use it and the other methods it provides.

Figure 2: initial empty ByteBuffer of size n.

The limit() functions as a marker and is not always equal to the capacity() of the buffer, as we shall see. When writing to the buffer, the limit tells how far you should write bytes, and when reading it tells how far you should read before stopping.

We can write data into the buffer in several different ways, but here we will focus only on perhaps the simplest form: a relative put (write) operation. Assuming we write three byte values into the buffer by invoking put(1) three times, we end up in the following state:

Figure 3: state after having written three bytes into the buffer

Each put() writes a byte into the current position, then advances the position by one. When the position reaches the limit(), no more bytes can be written.

Following a series of writes, the next typical thing to do would be to read those bytes out of the buffer. Again, there are several ways to do this, but we will focus on simple get() calls, reading one byte at a time. Since get() just reads from the current position, we cannot call it immediately after the put() calls, because we would get the byte at position 3, which is just an undefined zero. This is where the flip() method comes into play, which sets the limit to the current position and the position back to zero:

Figure 4: state after flipping the ByteBuffer

Now we can invoke get() three times to read the first three bytes back. For each call, the byte at the current position is returned and the position is advanced by one. The hasRemaining() method can be used to check if there are bytes left to read (or space left to write into) between the position and the limit. It will return true as long as the position is smaller than the limit.

When working with channels, you will often pass around ByteBuffer instances and do bulk read or write calls, meaning that you don’t have to deal with each byte on its own, but rather read/write some amount of bytes (often unknown in advance). This is typically also more efficient. The position, limit and capacity of the ByteBuffer still behave in the exact same way as described in this section.

Lastly, to prepare the buffer for future writes into it, we can clear() it. This would put the buffer back into the following state:

Figure 5: state after the buffer has been cleared

Notice that it is basically like the state in figure 2, except that there are still 1’s written into the first three cells. So a clear is just marker updates and no actual zeroing of memory. Future writes will just overwrite the old garbage bytes, and the position and limit will keep track of the number of valid bytes.

Back to the NIOServer message reception code

Now that we know some basics about the ByteBuffer, let’s go back to the message reception code in NIOServer. We start by reading however many bytes the client channel has to offer us:

  try {
    val read = channel.read(buffer)

We do this by passing our ByteBuffer instance to channel.read(), which actually writes into the buffer. The method returns the number of bytes read, which may be zero, and we store that in variable read. The following figure shows how the buffer may look after a read of 4 bytes:

Figure 6: buffer after reading 4 bytes from client channel

Next we need to scan our buffer for messages. From the example above, the buffer contains one complete message "nio" ended with a null byte. The logic is built to handle zero or more complete messages in the buffer after a single client read.

    var i = 0
    while (i < buffer.position()) {
      if (buffer.get(i) == END_MESSAGE_MARKER) {
        storeMessage(buffer.duplicate().position(i).flip())
        buffer.limit(buffer.position()).position(i + 1).compact()
        i = 0
      } else i += 1
    }

We always start at the beginning of the buffer and scan for the next end message marker byte, but never beyond the buffer position, which tells us where the client channel stopped writing into the buffer. When an end message null byte is found, we first pass a duplicate().position(i).flip() of the client byte buffer to the storeMessage() method. This is only a shallow copy of the buffer, sharing the same memory allocation, but with its own set of markers. We adjust it so the buffer is suitable for reading the exact part that contains the next message, excluding the null byte. The adjusted buffer will have the following state:

Figure 7: state of ephemeral ByteBuffer duplicate used for message decoding

The storeMessage() method simply reads from the provided buffer from position to limit and decodes the bytes as UTF-8, then stores the string message in an internal queue. Next we reset scanning state and remove the decoded message bytes from the client reception buffer, to free up space:

buffer.limit(buffer.position()).position(i + 1).compact()
i = 0

We set limit marker on the current position, which at this stage tells us where the client channel stopped writing into the buffer as a whole. Then position is updated to one beyond the end message null byte marker, and lastly the buffer is compacted. The following figure illustrates the effect of buffer compaction in general:

Figure 8: the effect of the compact() operation

After compaction the buffer is actually setup for more writing from client channel, and we keep only the so far unprocessed bytes, which are copied to the beginning of the buffer. However the scanning loop runs until there are no more complete messages present in the buffer or end of client written bytes is reached.

    if (!buffer.hasRemaining()) {
      log("error: client buffer overflow, too big message, discarding buffer")
      buffer.clear()
    }

If no complete message has been read from the buffer and it is also full, the server enforces a limitation and discards the buffer, for reasons of simplicity. Other ways to handle this could be to dynamically grow the client buffer.

    if (read == -1) {
      if (buffer.position() > 0) {
        storeMessage(buffer.flip())
      }
      cancel()
      channel.close()
    }

Finally, the last part of SelectionKey.receive() code is responsible for closing client channels, which is triggered by the read call returning -1. If there are remaining unprocessed bytes, those are stored as a last message from the client before closing the channel and cancelling the selection key. (I am not sure if it is necessary to do both.)

Thread safety

Since there is only a single thread running in the server code, thread safety is not required. However this server allows other threads to concurrently fetch messages from an in-memory queue and uses java.util.concurrent.ArrayBlockingQueue for this purpose. This allows any number of external threads to safely fetch messages from a single NIOServer instance concurrently with server thread itself is adding new incoming messages to the queue.

Automated tests

Test cases are implemented in the class NIOServerTests.kt. Perhaps the most interesting case is the last one, which spawns 1000 threads concurrently connecting and sending messages, then asserting that the correct number of messages were received by the server.

Notes about performance

As this was only an experiment, not much effort has been put into optimizing the code. The message scanning could be made more efficient by avoiding re-scan from the start of the buffer at every receive(), perhaps by using the mark() method of ByteBuffer to remember the current scan position. Also, the number of buffer compact() operations could be reduced to at most one.

Closing

We will mark the end of this lengthy blog post by showing the code that closes down the server cleanly:

override fun close() {
  selector.close()
  serverSocket.close()
}

When the selector is closed, the server thread stops looping and dies.