Source code

Revision control

Copy as Markdown

Other Tools

/* -*- Mode: indent-tabs-mode: nil; js-indent-level: 2 -*- */
/* vim: set sts=2 sw=2 et tw=80: */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
"use strict";
/* exported Process */
/* import-globals-from subprocess_shared.js */
/* import-globals-from subprocess_shared_unix.js */
/* import-globals-from subprocess_worker_common.js */
importScripts(
);
const POLL_TIMEOUT = 5000;
let io;
let nextPipeId = 0;
class Pipe extends BasePipe {
constructor(process, fd) {
super();
this.process = process;
this.fd = fd;
this.id = nextPipeId++;
}
get pollEvents() {
throw new Error("Not implemented");
}
/**
* Closes the file descriptor.
*
* @param {boolean} [force=false]
* If true, the file descriptor is closed immediately. If false, the
* file descriptor is closed after all current pending IO operations
* have completed.
*
* @returns {Promise<void>}
* Resolves when the file descriptor has been closed.
*/
close(force = false) {
if (!force && this.pending.length) {
this.closing = true;
return this.closedPromise;
}
for (let { reject } of this.pending) {
let error = new Error("File closed");
error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
reject(error);
}
this.pending.length = 0;
if (!this.closed) {
this.fd.dispose();
this.closed = true;
this.resolveClosed();
io.pipes.delete(this.id);
io.updatePollFds();
}
return this.closedPromise;
}
/**
* Called when an error occurred while polling our file descriptor.
*/
onError() {
this.close(true);
this.process.wait();
}
}
class InputPipe extends Pipe {
/**
* A bit mask of poll() events which we currently wish to be notified of on
* this file descriptor.
*/
get pollEvents() {
if (this.pending.length) {
return LIBC.POLLIN;
}
return 0;
}
/**
* Asynchronously reads at most `length` bytes of binary data from the file
* descriptor into an ArrayBuffer of the same size. Returns a promise which
* resolves when the operation is complete.
*
* @param {integer} length
* The number of bytes to read.
*
* @returns {Promise<ArrayBuffer>}
*/
read(length) {
if (this.closing || this.closed) {
throw new Error("Attempt to read from closed pipe");
}
return new Promise((resolve, reject) => {
this.pending.push({ resolve, reject, length });
io.updatePollFds();
});
}
/**
* Synchronously reads at most `count` bytes of binary data into an
* ArrayBuffer, and returns that buffer. If no data can be read without
* blocking, returns null instead.
*
* @param {integer} count
* The number of bytes to read.
*
* @returns {ArrayBuffer|null}
*/
readBuffer(count) {
let buffer = new ArrayBuffer(count);
let read = +libc.read(this.fd, buffer, buffer.byteLength);
if (read < 0 && ctypes.errno != LIBC.EAGAIN) {
this.onError();
}
if (read <= 0) {
return null;
}
if (read < buffer.byteLength) {
return ArrayBuffer_transfer(buffer, read);
}
return buffer;
}
/**
* Called when one of the IO operations matching the `pollEvents` mask may be
* performed without blocking.
*
* @returns {boolean}
* True if any data was successfully read.
*/
onReady() {
let result = false;
let reads = this.pending;
while (reads.length) {
let { resolve, length } = reads[0];
let buffer = this.readBuffer(length);
if (buffer) {
result = true;
this.shiftPending();
resolve(buffer);
} else {
break;
}
}
if (!reads.length) {
io.updatePollFds();
}
return result;
}
}
class OutputPipe extends Pipe {
/**
* A bit mask of poll() events which we currently wish to be notified of on
* this file discriptor.
*/
get pollEvents() {
if (this.pending.length) {
return LIBC.POLLOUT;
}
return 0;
}
/**
* Asynchronously writes the given buffer to our file descriptor, and returns
* a promise which resolves when the operation is complete.
*
* @param {ArrayBuffer} buffer
* The buffer to write.
*
* @returns {Promise<integer>}
* Resolves to the number of bytes written when the operation is
* complete.
*/
write(buffer) {
if (this.closing || this.closed) {
throw new Error("Attempt to write to closed pipe");
}
return new Promise((resolve, reject) => {
this.pending.push({ resolve, reject, buffer, length: buffer.byteLength });
io.updatePollFds();
});
}
/**
* Attempts to synchronously write the given buffer to our file descriptor.
* Writes only as many bytes as can be written without blocking, and returns
* the number of byes successfully written.
*
* Closes the file descriptor if an IO error occurs.
*
* @param {ArrayBuffer} buffer
* The buffer to write.
*
* @returns {integer}
* The number of bytes successfully written.
*/
writeBuffer(buffer) {
let bytesWritten = libc.write(this.fd, buffer, buffer.byteLength);
if (bytesWritten < 0 && ctypes.errno != LIBC.EAGAIN) {
this.onError();
}
return bytesWritten;
}
/**
* Called when one of the IO operations matching the `pollEvents` mask may be
* performed without blocking.
*/
onReady() {
let writes = this.pending;
while (writes.length) {
let { buffer, resolve, length } = writes[0];
let written = this.writeBuffer(buffer);
if (written == buffer.byteLength) {
resolve(length);
this.shiftPending();
} else if (written > 0) {
writes[0].buffer = buffer.slice(written);
} else {
break;
}
}
if (!writes.length) {
io.updatePollFds();
}
}
}
class Signal {
constructor(fd) {
this.fd = fd;
}
cleanup() {
libc.close(this.fd);
this.fd = null;
}
get pollEvents() {
return LIBC.POLLIN;
}
/**
* Called when an error occurred while polling our file descriptor.
*/
onError() {
io.shutdown();
}
/**
* Called when one of the IO operations matching the `pollEvents` mask may be
* performed without blocking.
*/
onReady() {
let buffer = new ArrayBuffer(16);
let count = +libc.read(this.fd, buffer, buffer.byteLength);
if (count > 0) {
io.messageCount += count;
}
}
}
class Process extends BaseProcess {
/**
* Each Process object opens an additional pipe from the target object, which
* will be automatically closed when the process exits, but otherwise
* carries no data.
*
* This property contains a bit mask of poll() events which we wish to be
* notified of on this descriptor. We're not expecting any input from this
* pipe, but we need to poll for input until the process exits in order to be
* notified when the pipe closes.
*/
get pollEvents() {
if (this.exitCode === null) {
return LIBC.POLLIN;
}
return 0;
}
/**
* Kills the process with the given signal.
*
* @param {integer} signal
*/
kill(signal) {
libc.kill(this.pid, signal);
this.wait();
}
/**
* Initializes the IO pipes for use as standard input, output, and error
* descriptors in the spawned process.
*
* @param {object} options
* The Subprocess options object for this process.
* @returns {unix.Fd[]}
* The array of file descriptors belonging to the spawned process.
*/
initPipes(options) {
let stderr = options.stderr;
let our_pipes = [];
let their_pipes = new Map();
let pipe = input => {
let fds = ctypes.int.array(2)();
let res = libc.pipe(fds);
if (res == -1) {
throw new Error("Unable to create pipe");
}
fds = Array.from(fds, unix.Fd);
if (input) {
fds.reverse();
}
if (input) {
our_pipes.push(new InputPipe(this, fds[1]));
} else {
our_pipes.push(new OutputPipe(this, fds[1]));
}
libc.fcntl(fds[0], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
libc.fcntl(fds[1], LIBC.F_SETFD, LIBC.FD_CLOEXEC);
libc.fcntl(fds[1], LIBC.F_SETFL, LIBC.O_NONBLOCK);
return fds[0];
};
their_pipes.set(0, pipe(false));
their_pipes.set(1, pipe(true));
if (stderr == "pipe") {
their_pipes.set(2, pipe(true));
} else if (stderr == "stdout") {
their_pipes.set(2, their_pipes.get(1));
}
// Create an additional pipe that we can use to monitor for process exit.
their_pipes.set(3, pipe(true));
this.fd = our_pipes.pop().fd;
this.pipes = our_pipes;
return their_pipes;
}
spawn(options) {
let fds = this.initPipes(options);
let launchOptions = {
environment: options.environment,
disclaim: options.disclaim,
fdMap: [],
};
// Check for truthiness to avoid chdir("null")
if (options.workdir) {
launchOptions.workdir = options.workdir;
}
for (let [dst, src] of fds.entries()) {
launchOptions.fdMap.push({ src, dst });
}
try {
this.pid = IOUtils.launchProcess(options.arguments, launchOptions);
} finally {
for (let fd of new Set(fds.values())) {
fd.dispose();
}
}
}
/**
* Called when input is available on our sentinel file descriptor.
*
* @see pollEvents
*/
onReady() {
// We're not actually expecting any input on this pipe. If we get any, we
// can't poll the pipe any further without reading it.
if (this.wait() == undefined) {
this.kill(9);
}
}
/**
* Called when an error occurred while polling our sentinel file descriptor.
*
* @see pollEvents
*/
onError() {
this.wait();
}
/**
* Attempts to wait for the process's exit status, without blocking. If
* successful, resolves the `exitPromise` to the process's exit value.
*
* @returns {integer|null}
* The process's exit status, if it has already exited.
*/
wait() {
if (this.exitCode !== null) {
return this.exitCode;
}
let status = ctypes.int();
let res = libc.waitpid(this.pid, status.address(), LIBC.WNOHANG);
// If there's a failure here and we get any errno other than EINTR, it
// means that the process has been reaped by another thread (most likely
// the nspr process wait thread), and its actual exit status is not
// available to us. In that case, we have to assume success.
if (res == 0 || (res == -1 && ctypes.errno == LIBC.EINTR)) {
return null;
}
let sig = unix.WTERMSIG(status.value);
if (sig) {
this.exitCode = -sig;
} else {
this.exitCode = unix.WEXITSTATUS(status.value);
}
this.fd.dispose();
io.updatePollFds();
this.resolveExit(this.exitCode);
return this.exitCode;
}
}
io = {
pollFds: null,
pollHandlers: null,
pipes: new Map(),
processes: new Map(),
messageCount: 0,
running: true,
polling: false,
init(details) {
this.signal = new Signal(details.signalFd);
this.updatePollFds();
setTimeout(this.loop.bind(this), 0);
},
shutdown() {
if (this.running) {
this.running = false;
this.signal.cleanup();
this.signal = null;
self.postMessage({ msg: "close" });
self.close();
}
},
getPipe(pipeId) {
let pipe = this.pipes.get(pipeId);
if (!pipe) {
let error = new Error("File closed");
error.errorCode = SubprocessConstants.ERROR_END_OF_FILE;
throw error;
}
return pipe;
},
getProcess(processId) {
let process = this.processes.get(processId);
if (!process) {
throw new Error(`Invalid process ID: ${processId}`);
}
return process;
},
updatePollFds() {
let handlers = [
this.signal,
...this.pipes.values(),
...this.processes.values(),
];
handlers = handlers.filter(handler => handler.pollEvents);
// Our poll loop is only useful if we've got at least 1 thing to poll other than our own
// signal.
if (handlers.length == 1) {
this.polling = false;
} else if (!this.polling && this.running) {
// Restart the poll loop if necessary:
setTimeout(this.loop.bind(this), 0);
this.polling = true;
}
let pollfds = unix.pollfd.array(handlers.length)();
for (let [i, handler] of handlers.entries()) {
let pollfd = pollfds[i];
pollfd.fd = handler.fd;
pollfd.events = handler.pollEvents;
pollfd.revents = 0;
}
this.pollFds = pollfds;
this.pollHandlers = handlers;
},
loop() {
this.poll();
if (this.running && this.polling) {
setTimeout(this.loop.bind(this), 0);
}
},
poll() {
let handlers = this.pollHandlers;
let pollfds = this.pollFds;
let timeout = this.messageCount > 0 ? 0 : POLL_TIMEOUT;
let count = libc.poll(pollfds, pollfds.length, timeout);
for (let i = 0; count && i < pollfds.length; i++) {
let pollfd = pollfds[i];
if (pollfd.revents) {
count--;
let handler = handlers[i];
try {
let success = false;
if (pollfd.revents & handler.pollEvents) {
success = handler.onReady();
}
// Only call the error handler in this iteration if we didn't also
// have a success. This is necessary because Linux systems set POLLHUP
// on a pipe when it's closed but there's still buffered data to be
// read, and Darwin sets POLLIN and POLLHUP on a closed pipe, even
// when there's no data to be read.
if (
!success &&
pollfd.revents & (LIBC.POLLERR | LIBC.POLLHUP | LIBC.POLLNVAL)
) {
handler.onError();
}
} catch (e) {
console.error(e);
debug(`Worker error: ${e} :: ${e.stack}`);
handler.onError();
}
pollfd.revents = 0;
}
}
},
addProcess(process) {
this.processes.set(process.id, process);
for (let pipe of process.pipes) {
this.pipes.set(pipe.id, pipe);
}
},
cleanupProcess(process) {
this.processes.delete(process.id);
},
};