Squadrick Whatever works

Interprocess Locks

Interprocess message passing is an important tool, especially when developing large complex systems like robots. ROS is commonly used for pretty much every small/medium scale robot, and it’s no different at Project MANAS where we were using ROS across the board. ROS uses a publish-subscribe paradigm for communicating between messages, and internally it uses either TCP/IP or UDP, which are both quite slow compared to using, say, the system’s shared memory.

I decided to build a small simple IPC library that uses the same pub-sub pattern, but using shared memory instead. I call this library Shadesmar, and it’s mostly used for transporting large messages (pointclouds, images) between processes and it works quite well with our pre-existing ROS codebase.

The reason I implemented the library from scratch instead of using Boost’s message_queue was that we needed the ability to do memory copies from GPU to shared memory and vice-versa, without the need to do an intermediate copy to the process’s memory space.

So we can achieve:

GPU -> Shared Memory -> GPU

instead of

GPU -> Process Memory -> Shared Memory -> Process Memory -> GPU

The implementation of Shadesmar (shm) is quite simple. We allocate a large chunk of system memory using the topic name as the key, and the memory is used as a circular buffer. Each publisher (writer) writes at the last index position, overwriting older pre-existing data. The subscribes regularly poll to know if there’s been a new message and copy the message if there’s a new message. Alternatively, I provide the ability to do serialization using the excellent library msgpack. You can find the full details of the implementation in the GitHub repo, but moving onto the title of the blog post: Interprocess Locks

Interprocess Synchronization

Since multiple publishers and subscribers have the ability to read and write from the same memory buffer, there’s a possibility of data corruption if the reads and writes overlap. To prevent this, each publisher acquires an exclusive lock for the buffer, writes the data and unlock the lock. Since the subscribers are read-only, they acquire a shared lock on the buffer, read the data and unlock the lock.

Boost has an interprocess sharable mutex that can be stored on shared memory, and it is perfect for this.

Psuedo-code for reading and writing:

// buffer is the shared memory circular queue

void read(MessageType *msg, int idx) {
  ipc_mut->lock_sharable();
  memcpy(buffer[idx], msg, sizeof(MessageType));
  ipc_mut->unlock_sharable();
}

void write(MessageType *msg) {
  int idx = get_last_idx();
  ipc_mut->lock();
  memcpy(msg, buffer[idx], sizeof(MessageType));
  ipc_mut->unlock();
}

This would work fine as long as we can assume that no process involved in this communication will crash or be killed. This, unfortunately, is not something we can guarantee, as simply Ctrl+Cing a running process will cause this to fail.

Failure

If a process dies while having exclusive or shared access to ipc_mut and it dies, then any other process trying to acquire this process will starve, causing the system to halt. We could assume that if a process fails to acquire access to ipc_mut in a small time span, the process previously holding ipc_mut is now dead so we forcefully acquire access and proceed with the read/write operation. However, this could lead to overlaps if the operation of the previous process took longer than the time span we waited.

There are a number of ways to tackle this problem, and I’ll be first talking about my approach, followed by other approaches.

Approach

I create a new lock type called robust_ipc_mutex that can handle the death of underlying processes. It keeps track of PID of the last process that acquired it, and if another process can’t acquire it, it can check if the PID is still alive using stat(), if it’s dead it will unlock the mutex and reset the field of the previous owner.

class robust_ipc_mutex {
public:  
  robust_ipc_mutex() = default;

  void lock() {
    // in a spin lock until we get access
    while(!mut.try_lock()) {
      if (exclusive_owner != 0) {
        if (proc_dead(exclusive_owner)) {
          exclusive_owner = 0;
          mut.unlock();
          continue;
        }
      }
      std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
    }
    exclusive_owner = getpid();
  }

  void unlock() {
    exclusive_owner = 0;
    mut.unlock();
  }

private:
  interprocess_sharable_mutex mut;
  __pid_t exclusive_owner{0};
}

Although the above code looks correct, it can lead to faults by unlocking mut more than necessary. Assume the following order of execution by 2 processes, when they call lock(), and exclusive_owner is dead.

// TIMESTEP 1
while(!mut.try_lock()) {                //<-- proc 1 (enters the loop)
  if (exclusive_owner != 0) {           //<-- proc 2 (enters the if)
    if (proc_dead(exclusive_owner)) {
      exclusive_owner = 0;
      mut.unlock();
      continue;
    }
  }
  std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}


// TIMESTEP 2
while(!mut.try_lock()) {                 
  if (exclusive_owner != 0) {           //<-- proc 1 (enters the if)
    if (proc_dead(exclusive_owner)) {   //<-- proc 2 (enters the if, since exclusive_owner is dead)
      exclusive_owner = 0;
      mut.unlock();
      continue;
    }
  }
  std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}


// TIMESTEP 3
while(!mut.try_lock()) {                 
  if (exclusive_owner != 0) {           
    if (proc_dead(exclusive_owner)) {   //<-- proc 1 (enters the if, if proc 1 runs before proc 2)
      exclusive_owner = 0;              //<-- proc 2 (might run after proc 1)
      mut.unlock();
      continue;
    }
  }
  std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
}

After timestep 3, both proc 1 and 2 call mut.unlock() which is incorrect. Further, exclusive_owner is reset to 0 twice instead of once as expected. This condition arises dues to the lack of atomicity of the operations leading to interleaved execution. We can tackle the issue by using C++’s atomic functionality.

Instead of simply assigning exclusive_owner = 0, we use a CAS to ensure atomicity across processes.

class robust_ipc_mutex {
public:  
  robust_ipc_mutex() = default;

  void lock() {
    // in a spin lock until we get access
    while(!mut.try_lock()) {
      if (exclusive_owner != 0) {
        auto ex_proc = exclusive_owner.load(); // atomic load
        if (proc_dead(ex_proc)) {
          if (exclusive_owner.compare_exchange_strong(ex_proc, 0)) {
            // ensures that the process which we checked for liveness
            // is the same as the value we're replacing

            // if the condition returns false
            // exclusive_owner was reset by some other process
            mut.unlock();
            continue;
          }
        }
      }
      std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
    }
    exclusive_owner = getpid();
  }

  void unlock() {
    auto current_pid = getpid();
    if (exclusive_owner.compare_exchange_strong(current_pid, 0)) {
      mut.unlock();
    }
  }

private:
  interprocess_sharable_mutex mut;
  std::atomic<__pid_t> exclusive_owner{0};
}

Using atomic, we can guarantee that lock() and unlock() work even if the process dies. This only tackles the problem of exclusive access where only one process can hold the lock. We also need to add the ability to have shared access (lock_sharable(), unlock_sharable()). This is a little tough since we need to keep track of multiple shared processes instead of a single exclusive process, and like above, we need to ensure that the data structure we use to store the shared processes are correct under concurrent use without the use of any locks. I’ll build a simple lock-less linear set for this. This is a great video of building a lock-free hash table which is where I started off when learning how to build this data structure.

Lockless Set

The set needs to be able to insert PIDs, delete PIDs, and traverse through the set.

template <uint32_t max_size>
class LocklessSet {
public:
  LocklessSet();

  void insert(uint32_t);

  bool remove(uint32_t);

  std::atomic<uint32_t> pids[max_size];
}

(NOTE: I used a linear set since max_size is very small (32), but the implementation can be easily expanded to make a hash-set with closed hashing.)

The constructor initializes pids to the default value of 0.

LocklessSet() {
  std::memset(pids, 0, max_size);
}

insert() and remove() use atomic CAS for inserting the elements concurrently.

bool insert(uint32_t elem) {
  for (uint32_t idx = 0; idx < size; ++idx) {
    auto probedElem = pids[idx].load();

    if (probedElem != elem) {
      // The entry is either free or contains another key
      if (probedElem != 0) {
        continue; // contains another key
      }
      // Entry is free, time for CAS
      // probedKey or pids[idx] is expected to be zero
      uint32_t exp = 0;
      if (pids[idx].compare_exchange_strong(exp, elem)) {
        // successfully insert the element
        return true;
      } else {
        // some other proc got to it before us, continue searching
        continue;
      }
    }
    // no space in the set
    return false;
  }
}

bool remove(uint32_t elem) {
  for (uint32_t idx = 0; idx < size; ++idx) {
    auto probedElem = pids[idx].load();

    if (probedElem == elem) {
      return pids[idx].compare_exchange_strong(elem, 0);
      // if true, we successfully do a CAS and the element was deleted by current proc
      // if false, some other proc deleted the element before
    }
  }

  // we exit after doing a full pass through the array
  // but we failed to delete an element, maybe already deleted
  return false;
}

We can use LocklessSet to build lock_sharable() and unlock_sharable() into robust_ipc_mutex. We also need to modify lock() to take the shared processes into account.

class robust_ipc_mutex {
public:  
  robust_ipc_mutex() = default;

  void lock();

  void unlock();

  void lock_sharable();

  void unlock_sharable();

private:
  void prune_sharable_procs();

  LocklessSet<32> shared_owners;
  interprocess_sharable_mutex mut;
  __pid_t exclusive_owner{0};
}

prune_sharable_procs() iterates through the set and atomically removes dead processes that are holding a shared access on the lock.

void prune_sharable_procs() {
  for (auto &i : shared_owners.__array) {
    uint32_t shared_owner = i.load();

    if (shared_owner == 0)
      continue;
    if (proc_dead(shared_owner)) {
      if (shared_owners.remove(shared_owner)) {
        // removal of element was a success
        // this ensures no duplicate deletion
        mutex_.unlock_sharable();
      }
    }
  }
}

In lock() if we fail to get exclusive access, and exclusive_owner = 0, we can prune_sharable_procs() to remove any dead processes.

void lock() {
  while (!mutex_.try_lock()) {
    // failed to get mutex_ within timeout,
    // so mutex_ is either held properly
    // or some process which holds it has died
    if (exclusive_owner != 0) {
      // exclusive_owner is not default value, some other proc
      // has access already
      auto ex_proc = exclusive_owner.load();
      if (proc_dead(ex_proc)) {
        // ex_proc is dead, we unlock
        // and continue immediately to next loop
        if (exclusive_owner.compare_exchange_strong(ex_proc, 0)) {
          mutex_.unlock();
          continue;
        }
      }
    } else {
      // exclusive_owner = 0, so the writers are blocking us
      prune_sharable_procs();
    }
    std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
  }

  // only a single proc can get here at a time
  exclusive_owner = getpid();
}

lock_sharable() is similar to lock(), but we only need to check for exclusive_owner.

void lock_sharable() {
  while (!mutex_.try_lock_sharable()) {
    // only reason for failure is that exclusive lock is held
    if (exclusive_owner != 0) {
      auto ex_proc = exclusive_owner.load();
      if (proc_dead(ex_proc)) {
        // exclusive_owner is dead
        if (exclusive_owner.compare_exchange_strong(ex_proc, 0)) {
          exclusive_owner = 0;
          mutex_.unlock();
        }
      }
    }
    std::this_thread::sleep_for(std::chrono::microseconds(TIMEOUT));
  }

  // loop until we insert our
  while(!shared_owners.insert(getpid()));
}

We use LocklessSet’s remove() to delete the current PID when unlock_sharable() is called.

void unlock_sharable() {
  if (shared_owners.remove(getpid())) {
    mutex_.unlock_sharable();
  }
}

robust_ipc_mutex can be placed in the shared memory segment that you’re trying to synchronize.

Performance

I created a benchmark to calculate the time for various operations compared to a basic interprocess_sharable_mutex. robust_ipc_mutex can handle up to 64 shared processes and TIMEOUT is set as 1 μs.

Locking and unlocking continuously for 1000000 times:

Base mutex: 95656.8 ± 9585.832 μs
Robust mutex: 1176544.0 ± 19633.354 μs

Shared locking and unlocking continuously for 1000000 times:

Base mutex: 80629.0 ± 3651.181 μs
Robust mutex: 1172940.6 ± 7039.073 μs

robust_ipc_mutex is 12-15x slower compared to the underlying base mutex.

Within shadesmar we replaced the base mutex with the robust mutex, the number of messages of size 7 bytes written in 1 ms (1 publisher, 1 subscriber) are reported below:

Base mutex: 206335.2 ± 5148.852
Robust mutex: 167751.4 ± 1717.818

robust_ipc_mutex is around 1.25x slower than the base mutex.

From synthetic benchmarks, it seems that robust_ipc_mutex is quite slow compared to using the base mutex, but in a more realistic scenario the difference is not found at all.

Publishing message of size 10000000 bytes from 1 publisher to 8 subscribers with message serialization (using msgpack). Below are the messages sent in 1 s:

Base mutex: 533.0 ± 34.438
Modified mutex: 521.0 ± 10.358

The difference between the two is far smaller, and since the typical message size that Shadesmar will transport will be in the order of several megabytes (uncompressed images/pointclouds/tensors), the time trade-off is worth the robustness.


Other Approaches

This isn’t the only way to solve this problem. While researching if someone else has already tackled a similar problem, I stumbled upon other possible solutions:

  1. POSIX’s pthread can be made robust using pthread_mutexattr_setrobust(), where if a thread holding the mutex dies, a waiting thread receives EOWNERDEAD. The waiting thread acquires the mutex, and it is its responsibility to make the mutex consistent using pthread_mutex_consistent(). By default, pthreads can’t be used between process via shared memory, but it can be set to work in shared memory using pthread_mutexattr_setpshared(). pthreads support very basic sync mechanism and a sharable mutex should be built using a pthread taking EOWNERDEAD into consideration. I’m currently working on this as an alternative robust IPC lock, and I’ll make a blog post when I’m done. You can read about the solution (and its failure) here.

  2. Using a centralized process for tracking processes. We register each process using the mutex with the master. The maser creates a non-blocking TCP socket with a registering process. The master will try to read data from the socket. If the other process is read, it reads 0 bytes. If the other process is alive, it gets EWOULDBLOCK. The master would regularly check that all registered processes are alive, and clean up any dead process and make the mutex consistent.


The header file with the actual implementation can be found here: robust_lock.h

Discussion: Hacker News, Lobsters