25 November 2023

Simple single-process file locking/queueing in Node

In a single-process application with asynchronous IO—like a Node-based text editor—you need file locking to prevent race conditions when writing e.g. JSON config files.

This is because there is no built-in locking or atomicity in Node’s filesystem functions, so you can get situations like this:

  1. Event A fires, triggering a config file write. writeFile (A) is called and Node starts writing to disk.

  2. Event B fires, triggering another write. writeFile is called again (B), before write A has returned.

  3. The file ends up corrupted, with some data from write A and some from write B.

We can also see the following:

  1. Some code starts reading a file

  2. Another event fires and starts writing to the file while it’s being read

  3. The read sees a mixture of the file with and without the writes

So both reads and writes must be queued.

Luckily, we don’t need a full-fledged locking mechanism for this situation because we don’t need to worry about truly parallel processes interacting with the files. We can implement a simple queue, and JavaScript’s single-threaded event loop model ensures that updates to the queue’s state are atomic, avoiding the kind of fine-grained race conditions we would have to account for in a multi-process environment.

As a simple optimisation, if there is already a read in the queue and the code calls read() again, it just gets the existing promise.

As a rough illustration, the code I have currently works like this (untested):

// one queue per file path
let queues = {};

let api = {
	async read(path) {
		/*
		reads can share a task, as there's no point reading the same data
		twice
		*/
		
		let existingTask = queues[path]?.find(task => task.type === "read");
		
		if (existingTask) {
			return existingTask.promise;
		}
		
		let task = {
			type: "read",
			promise: promiseWithMethods(),
			inProgress: false,
		};
		
		api.getQueue(path).push(task);
		
		api.checkQueue(path);
		
		return task.promise;
	}
	
	async write(path, data) {
		let task = {
			type: "write",
			data,
			promise: promiseWithMethods(),
			inProgress: false,
		};
		
		api.getQueue(path).push(task);
		
		api.checkQueue(path);
		
		return task.promise;
	}
	
	async checkQueue(path) {
		let queue = queues[path];
		let task = queue[0];
		
		if (task.inProgress) {
			// if there is a task in progress, we'll recur once it's done
			// to process the next task
			
			return;
		}
		
		// we have a task waiting and it hasn't been started yet; start it
		
		task.inProgress = true;
		
		try {
			if (task.type === "read") {
				task.promise.resolve(await api._read(path));
			} else {
				task.promise.resolve(await api._write(path, task.data));
			}
			
			// finally clause below is executed before any other promise
			// callbacks are called/awaits resumed
		} catch (e) {
			task.promise.reject(e);
		} finally {
			queue.shift();
			
			if (queue.length > 0) {
				api.checkQueue(path);
			} else {
				delete queues[path];
			}
		}
	}
	
	async _read(path) {
		return (await fs.readFile(path)).toString();
	}
	
	async _write(path, data) {
		return await fs.writeFile(path, data);
	}
}

/*
util - Promise that can be resolved/rejected from outside
*/

function promiseWithMethods() {
	let resolve;
	let reject;
	
	let promise = new Promise(function(res, rej) {
		resolve = res;
		reject = rej;
	});
	
	promise.resolve = resolve;
	promise.reject = reject;
	
	return promise;
}

One subtlety that has to be considered when writing the logic above is the order in which promise callbacks are called, or awaits are resumed, when promises are settled. Fortunately, in JavaScript this works as you would hope: callbacks are called in the order they’re added, which means that code immediately following the original await of a promise is executed before any awaits or then()s that are added subsequently. This means that from the perspective of code outside of checkQueue, there is never a state where a task is complete but hasn’t been removed from the queue yet.