diff options
Diffstat (limited to 'node_modules/cacache/lib/content/write.js')
-rw-r--r-- | node_modules/cacache/lib/content/write.js | 205 |
1 files changed, 205 insertions, 0 deletions
diff --git a/node_modules/cacache/lib/content/write.js b/node_modules/cacache/lib/content/write.js new file mode 100644 index 0000000..7146146 --- /dev/null +++ b/node_modules/cacache/lib/content/write.js @@ -0,0 +1,205 @@ +'use strict' + +const events = require('events') + +const contentPath = require('./path') +const fs = require('fs/promises') +const { moveFile } = require('@npmcli/fs') +const { Minipass } = require('minipass') +const Pipeline = require('minipass-pipeline') +const Flush = require('minipass-flush') +const path = require('path') +const ssri = require('ssri') +const uniqueFilename = require('unique-filename') +const fsm = require('fs-minipass') + +module.exports = write + +// Cache of move operations in process so we don't duplicate +const moveOperations = new Map() + +async function write (cache, data, opts = {}) { + const { algorithms, size, integrity } = opts + + if (typeof size === 'number' && data.length !== size) { + throw sizeError(size, data.length) + } + + const sri = ssri.fromData(data, algorithms ? { algorithms } : {}) + if (integrity && !ssri.checkData(data, integrity, opts)) { + throw checksumError(integrity, sri) + } + + for (const algo in sri) { + const tmp = await makeTmp(cache, opts) + const hash = sri[algo].toString() + try { + await fs.writeFile(tmp.target, data, { flag: 'wx' }) + await moveToDestination(tmp, cache, hash, opts) + } finally { + if (!tmp.moved) { + await fs.rm(tmp.target, { recursive: true, force: true }) + } + } + } + return { integrity: sri, size: data.length } +} + +module.exports.stream = writeStream + +// writes proxied to the 'inputStream' that is passed to the Promise +// 'end' is deferred until content is handled. +class CacacheWriteStream extends Flush { + constructor (cache, opts) { + super() + this.opts = opts + this.cache = cache + this.inputStream = new Minipass() + this.inputStream.on('error', er => this.emit('error', er)) + this.inputStream.on('drain', () => this.emit('drain')) + this.handleContentP = null + } + + write (chunk, encoding, cb) { + if (!this.handleContentP) { + this.handleContentP = handleContent( + this.inputStream, + this.cache, + this.opts + ) + } + return this.inputStream.write(chunk, encoding, cb) + } + + flush (cb) { + this.inputStream.end(() => { + if (!this.handleContentP) { + const e = new Error('Cache input stream was empty') + e.code = 'ENODATA' + // empty streams are probably emitting end right away. + // defer this one tick by rejecting a promise on it. + return Promise.reject(e).catch(cb) + } + // eslint-disable-next-line promise/catch-or-return + this.handleContentP.then( + (res) => { + res.integrity && this.emit('integrity', res.integrity) + // eslint-disable-next-line promise/always-return + res.size !== null && this.emit('size', res.size) + cb() + }, + (er) => cb(er) + ) + }) + } +} + +function writeStream (cache, opts = {}) { + return new CacacheWriteStream(cache, opts) +} + +async function handleContent (inputStream, cache, opts) { + const tmp = await makeTmp(cache, opts) + try { + const res = await pipeToTmp(inputStream, cache, tmp.target, opts) + await moveToDestination( + tmp, + cache, + res.integrity, + opts + ) + return res + } finally { + if (!tmp.moved) { + await fs.rm(tmp.target, { recursive: true, force: true }) + } + } +} + +async function pipeToTmp (inputStream, cache, tmpTarget, opts) { + const outStream = new fsm.WriteStream(tmpTarget, { + flags: 'wx', + }) + + if (opts.integrityEmitter) { + // we need to create these all simultaneously since they can fire in any order + const [integrity, size] = await Promise.all([ + events.once(opts.integrityEmitter, 'integrity').then(res => res[0]), + events.once(opts.integrityEmitter, 'size').then(res => res[0]), + new Pipeline(inputStream, outStream).promise(), + ]) + return { integrity, size } + } + + let integrity + let size + const hashStream = ssri.integrityStream({ + integrity: opts.integrity, + algorithms: opts.algorithms, + size: opts.size, + }) + hashStream.on('integrity', i => { + integrity = i + }) + hashStream.on('size', s => { + size = s + }) + + const pipeline = new Pipeline(inputStream, hashStream, outStream) + await pipeline.promise() + return { integrity, size } +} + +async function makeTmp (cache, opts) { + const tmpTarget = uniqueFilename(path.join(cache, 'tmp'), opts.tmpPrefix) + await fs.mkdir(path.dirname(tmpTarget), { recursive: true }) + return { + target: tmpTarget, + moved: false, + } +} + +async function moveToDestination (tmp, cache, sri, opts) { + const destination = contentPath(cache, sri) + const destDir = path.dirname(destination) + if (moveOperations.has(destination)) { + return moveOperations.get(destination) + } + moveOperations.set( + destination, + fs.mkdir(destDir, { recursive: true }) + .then(async () => { + await moveFile(tmp.target, destination, { overwrite: false }) + tmp.moved = true + return tmp.moved + }) + .catch(err => { + if (!err.message.startsWith('The destination file exists')) { + throw Object.assign(err, { code: 'EEXIST' }) + } + }).finally(() => { + moveOperations.delete(destination) + }) + + ) + return moveOperations.get(destination) +} + +function sizeError (expected, found) { + /* eslint-disable-next-line max-len */ + const err = new Error(`Bad data size: expected inserted data to be ${expected} bytes, but got ${found} instead`) + err.expected = expected + err.found = found + err.code = 'EBADSIZE' + return err +} + +function checksumError (expected, found) { + const err = new Error(`Integrity check failed: + Wanted: ${expected} + Found: ${found}`) + err.code = 'EINTEGRITY' + err.expected = expected + err.found = found + return err +} |