diff --git a/src/data-index.js b/src/data-index.js index 5cf89b1..bde895d 100644 --- a/src/data-index.js +++ b/src/data-index.js @@ -455,124 +455,63 @@ class DataIndex { return this._updateTree(path, keyValues.oldValue, keyValues.newValue, recordPointer, recordPointer, metadata); } - // /** - // * - // * @param {string} path - // * @param {any} oldValue - // * @param {any} newValue - // */ - // handleRecordUpdate(path, oldValue, newValue) { - - // const keyValues = getChildValues(this.key, oldValue, newValue); - // const canBeIndexed = ['number','boolean','string'].indexOf(typeof keyValues.newValue) >= 0 || keyValues.newValue instanceof Date; - - // const includedValues = this.includeKeys.map(key => getChildValues(key, oldValue, newValue)); - // const keyValueChanged = compareValues(keyValues.oldValue, keyValues.newValue) !== 'identical'; - // const includedValuesChanged = includedValues.some(values => compareValues(values.oldValue, values.newValue) !== 'identical'); - - // if (!keyValueChanged && !includedValuesChanged) { - // return; - // } - - // const startTime = Date.now(); - // const updatedKey = getPathInfo(path).key; - // // const pathKeys = getPathKeys(path); - // // const indexKeys = getPathKeys(this.path); - // // const wildcardKeys = indexKeys.reduce((wildcards, key, i) => { - // // if (key === '*') { wildcards.push(pathKeys[i]); } - // // return wildcards; - // // }, []); - // const wildcardKeys = this._getWildcardKeys(path); - // const recordPointer = _createRecordPointer(wildcardKeys, updatedKey); - // const metadata = (() => { - // const obj = {}; - // this.includeKeys.forEach(key => obj[key] = newValue[key]); - // return obj; - // })(); - - // // debug.log(`Requesting update lock on index ${this.description}`.blue); - // let lock; - // return this._lock(true, `index.handleRecordUpdate "/${path}"`) - // .then(l => { - // // debug.log(`Got update lock on index ${this.description}`.blue, l); - // lock = l; - // return this._getTree(); - // }) - // .then(idx => { - // /** - // * @type BinaryBPlusTree - // */ - // const tree = idx.tree; - // // const oldEntry = tree.find(keyValues.oldValue); - // const ops = []; - // if (keyValues.oldValue !== null) { - // //ops.push({ type: 'remove', key: keyValues.oldValue, value: recordPointer }) - // let op = BinaryBPlusTree.TransactionOperation.remove(keyValues.oldValue, recordPointer); - // ops.push(op); - // } - // if (keyValues.newValue !== null && canBeIndexed) { - // // ops.push({ type: 'add', key: keyValues.newValue, value: recordPointer, metadata }); - // let op = BinaryBPlusTree.TransactionOperation.add(keyValues.newValue, recordPointer, metadata); - // ops.push(op); - // } - // return tree.transaction(ops) - // .then(() => { - // // Index updated - // idx.close(); - // return false; // not rebuilt - // }) - // .catch(err => { - // // Could not update index --> leaf full? - // debug.log(`Could not update index ${this.description}: ${err.message}`.yellow); - - // // Rebuild it by getting current content - // return tree.toTreeBuilder(FILL_FACTOR) - // .then(builder => { - // idx.close(); - - // // Reprocess the changes - // if (keyValues.oldValue !== null) { - // builder.remove(keyValues.oldValue, recordPointer); - // } - // if (keyValues.newValue !== null && canBeIndexed) { - // builder.add(keyValues.newValue, recordPointer, metadata); - // } - // return Uint8Array.from(builder.create().toBinary(true)); - // }) - // .then(binary => { - // // overwrite the file - // return new Promise((resolve, reject) => { - // fs.writeFile(this.fileName, Buffer.from(binary.buffer), (err) => { - // if (err) { - // debug.error(err); - // reject(err); - // } - // else { - // resolve(); - // } - // }); - // }); - // }) - // .then(() => { - // return true; // rebuilt - // }); - // }) - // .then(rebuilt => { - // const doneTime = Date.now(); - // const duration = Math.round((doneTime - startTime) / 1000); - // debug.log(`Index ${this.description} was ${rebuilt ? 'rebuilt' : 'updated'} successfully for "/${path}", took ${duration} seconds`.green); - // }); - // }) - // .then(() => { - // // debug.log(`Released update lock on index ${this.description}`.blue); - // lock.release(); - // }); - // } - _lock(forWriting, comment) { - const tid = ID.generate(); // forWriting ? "write-index" : "read-index"; - let lockPath = `__index__/${this.path.replace(/\*/g, '__')}/__/${this.key}`; - return IndexLock.lock(lockPath, tid, forWriting, comment, { noTimeout: true }); + if (!this._lockQueue) { this._lockQueue = []; } + if (!this._lockState) { + this._lockState = { + isLocked: false, + forWriting: undefined, + comment: undefined + }; + } + + const lock = { forWriting, comment, release: comment => { + const pending = []; + while (true) { + if (this._lockQueue.length === 0) { break; } + const next = this._lockQueue[0]; + if (next.forWriting) { + if (pending.length === 0) { + pending.push(next); + this._lockQueue.shift(); + } + break; + } + else { + pending.push(next); + this._lockQueue.shift(); + } + } + if (pending.length === 0) { + this._lockState.isLocked = false; + this._lockState.forWriting = undefined; + this._lockState.comment = undefined; + } + else { + this._lockState.forWriting = pending[0].forWriting; + this._lockState.comment = ''; + } + for (let i = 0; i < pending.length; i++) { + const lock = pending[i]; + if (this._lockState.comment.length > 0) { this._lockState.comment += ' && '} + this._lockState.comment += lock.comment; + lock.resolve(lock); + } + }}; + if (this._lockState.isLocked) { + // Queue lock request + this._lockQueue.push(lock); + return new Promise(resolve => { + lock.resolve = resolve; + }); + } + else { + // No current lock, allow + this._lockState.isLocked = true; + this._lockState.forWriting = forWriting; + this._lockState.comment = comment; + return Promise.resolve(lock); + } } count(op, val) { @@ -1306,145 +1245,6 @@ class IndexQueryResults extends Array { }); return IndexQueryResults.from(filtered, this.filterKey); } - -} - -class IndexLock { - - static get LOCK_STATE() { - return { - PENDING: 'pending', - LOCKED: 'locked', - EXPIRED: 'expired', - DONE: 'done' - }; - }; - - /** - * Constructor for a record lock - * @param {Storage} storage - * @param {string} path - * @param {string} tid - * @param {boolean} forWriting - * @param {boolean} priority - */ - constructor(storage, path, tid, forWriting, priority = false) { - this.tid = tid; - this.path = path; - this.forWriting = forWriting; - this.priority = priority; - this.state = IndexLock.LOCK_STATE.PENDING; - this.storage = storage; - this.requested = Date.now(); - this.granted = undefined; - this.expires = undefined; - this.comment = ""; - this.waitingFor = null; - } - - release(comment) { - return IndexLock.unlock(this, comment || this.comment); - } - - /** - * Locks a path for writing. While the lock is in place, it's value cannot be changed by other transactions. - * @param {string} path path being locked - * @param {string} tid a unique value to identify your transaction - * @param {boolean} forWriting if the record will be written to. Multiple read locks can be granted access at the same time if there is no write lock. Once a write lock is granted, no others can read from or write to it. - * @returns {Promise} returns a promise with the lock object once it is granted. It's .release method can be used as a shortcut to .unlock(path, tid) to release the lock - */ - static lock(path, tid, forWriting = true, comment = '', options = { withPriority: false, noTimeout: false }) { - let lock, proceed; - if (path instanceof IndexLock) { - lock = path; - lock.comment = `(retry: ${lock.comment})`; - proceed = true; - } - else { - lock = new IndexLock(this, path, tid, forWriting, options.withPriority === true); - lock.comment = comment; - _locks.push(lock); - const check = _allowLock(path, tid, forWriting); - lock.waitingFor = check.conflict || null; - proceed = check.allow; - } - - if (proceed) { - lock.state = IndexLock.LOCK_STATE.LOCKED; - lock.granted = Date.now(); - return Promise.resolve(lock); - } - else { - // Keep pending until clashing lock(s) is/are removed - console.assert(lock.state === IndexLock.LOCK_STATE.PENDING); - const p = new Promise((resolve, reject) => { - lock.resolve = resolve; - lock.reject = reject; - }); - return p; - } - } - - static unlock(lock, comment, processQueue = true) {// (path, tid, comment) { - const i = _locks.indexOf(lock); //_locks.findIndex(lock => lock.tid === tid && lock.path === path); - if (i < 0) { - const msg = `lock on "/${lock.path}" for tid ${lock.tid} wasn't found; ${comment}`; - debug.error(`unlock :: ${msg}`); - return Promise.reject(new Error(msg)); - } - lock.state = IndexLock.LOCK_STATE.DONE; - clearTimeout(lock.timeout); - _locks.splice(i, 1); - processQueue && _processLockQueue(); - return Promise.resolve(lock); - } - -} - -/** - * @type {IndexLock[]} - */ -const _locks = []; - -function _allowLock(path, tid, forWriting) { - // Can this lock be granted now or do we have to wait? - const conflict = _locks - .filter(otherLock => otherLock.tid !== tid && otherLock.state === IndexLock.LOCK_STATE.LOCKED) - .find(otherLock => { - return ( - // Other lock clashes with requested lock, if: - // One (or both) of them is for writing - (forWriting || otherLock.forWriting) - - // and requested lock is on the path - && path === otherLock.path - ); - }); - - const clashes = typeof conflict !== 'undefined'; - return { allow: !clashes, conflict }; -} - -function _processLockQueue() { - const pending = _locks - .filter(lock => - lock.state === IndexLock.LOCK_STATE.PENDING - && (lock.waitingFor === null || lock.waitingFor.state !== IndexLock.LOCK_STATE.LOCKED) - ) - .sort((a,b) => { - if (a.priority && !b.priority) { return -1; } - else if (!a.priority && b.priority) { return 1; } - return a.requested < b.requested; - }); - pending.forEach(lock => { - const check = _allowLock(lock.path, lock.tid, lock.forWriting); - lock.waitingFor = check.conflict || null; - if (check.allow) { - IndexLock.lock(lock) - .then(lock.resolve) - .catch(lock.reject); - } - }); } /**