Fixed and improved index locking

This commit is contained in:
Ewout Stortenbeker 2018-12-14 17:32:38 +01:00
parent f7a3ad7370
commit 1ce56800a8

View file

@ -455,124 +455,63 @@ class DataIndex {
return this._updateTree(path, keyValues.oldValue, keyValues.newValue, recordPointer, recordPointer, metadata);
}
// /**
// *
// * @param {string} path
// * @param {any} oldValue
// * @param {any} newValue
// */
// handleRecordUpdate(path, oldValue, newValue) {
// const keyValues = getChildValues(this.key, oldValue, newValue);
// const canBeIndexed = ['number','boolean','string'].indexOf(typeof keyValues.newValue) >= 0 || keyValues.newValue instanceof Date;
// const includedValues = this.includeKeys.map(key => getChildValues(key, oldValue, newValue));
// const keyValueChanged = compareValues(keyValues.oldValue, keyValues.newValue) !== 'identical';
// const includedValuesChanged = includedValues.some(values => compareValues(values.oldValue, values.newValue) !== 'identical');
// if (!keyValueChanged && !includedValuesChanged) {
// return;
// }
// const startTime = Date.now();
// const updatedKey = getPathInfo(path).key;
// // const pathKeys = getPathKeys(path);
// // const indexKeys = getPathKeys(this.path);
// // const wildcardKeys = indexKeys.reduce((wildcards, key, i) => {
// // if (key === '*') { wildcards.push(pathKeys[i]); }
// // return wildcards;
// // }, []);
// const wildcardKeys = this._getWildcardKeys(path);
// const recordPointer = _createRecordPointer(wildcardKeys, updatedKey);
// const metadata = (() => {
// const obj = {};
// this.includeKeys.forEach(key => obj[key] = newValue[key]);
// return obj;
// })();
// // debug.log(`Requesting update lock on index ${this.description}`.blue);
// let lock;
// return this._lock(true, `index.handleRecordUpdate "/${path}"`)
// .then(l => {
// // debug.log(`Got update lock on index ${this.description}`.blue, l);
// lock = l;
// return this._getTree();
// })
// .then(idx => {
// /**
// * @type BinaryBPlusTree
// */
// const tree = idx.tree;
// // const oldEntry = tree.find(keyValues.oldValue);
// const ops = [];
// if (keyValues.oldValue !== null) {
// //ops.push({ type: 'remove', key: keyValues.oldValue, value: recordPointer })
// let op = BinaryBPlusTree.TransactionOperation.remove(keyValues.oldValue, recordPointer);
// ops.push(op);
// }
// if (keyValues.newValue !== null && canBeIndexed) {
// // ops.push({ type: 'add', key: keyValues.newValue, value: recordPointer, metadata });
// let op = BinaryBPlusTree.TransactionOperation.add(keyValues.newValue, recordPointer, metadata);
// ops.push(op);
// }
// return tree.transaction(ops)
// .then(() => {
// // Index updated
// idx.close();
// return false; // not rebuilt
// })
// .catch(err => {
// // Could not update index --> leaf full?
// debug.log(`Could not update index ${this.description}: ${err.message}`.yellow);
// // Rebuild it by getting current content
// return tree.toTreeBuilder(FILL_FACTOR)
// .then(builder => {
// idx.close();
// // Reprocess the changes
// if (keyValues.oldValue !== null) {
// builder.remove(keyValues.oldValue, recordPointer);
// }
// if (keyValues.newValue !== null && canBeIndexed) {
// builder.add(keyValues.newValue, recordPointer, metadata);
// }
// return Uint8Array.from(builder.create().toBinary(true));
// })
// .then(binary => {
// // overwrite the file
// return new Promise((resolve, reject) => {
// fs.writeFile(this.fileName, Buffer.from(binary.buffer), (err) => {
// if (err) {
// debug.error(err);
// reject(err);
// }
// else {
// resolve();
// }
// });
// });
// })
// .then(() => {
// return true; // rebuilt
// });
// })
// .then(rebuilt => {
// const doneTime = Date.now();
// const duration = Math.round((doneTime - startTime) / 1000);
// debug.log(`Index ${this.description} was ${rebuilt ? 'rebuilt' : 'updated'} successfully for "/${path}", took ${duration} seconds`.green);
// });
// })
// .then(() => {
// // debug.log(`Released update lock on index ${this.description}`.blue);
// lock.release();
// });
// }
_lock(forWriting, comment) {
const tid = ID.generate(); // forWriting ? "write-index" : "read-index";
let lockPath = `__index__/${this.path.replace(/\*/g, '__')}/__/${this.key}`;
return IndexLock.lock(lockPath, tid, forWriting, comment, { noTimeout: true });
if (!this._lockQueue) { this._lockQueue = []; }
if (!this._lockState) {
this._lockState = {
isLocked: false,
forWriting: undefined,
comment: undefined
};
}
const lock = { forWriting, comment, release: comment => {
const pending = [];
while (true) {
if (this._lockQueue.length === 0) { break; }
const next = this._lockQueue[0];
if (next.forWriting) {
if (pending.length === 0) {
pending.push(next);
this._lockQueue.shift();
}
break;
}
else {
pending.push(next);
this._lockQueue.shift();
}
}
if (pending.length === 0) {
this._lockState.isLocked = false;
this._lockState.forWriting = undefined;
this._lockState.comment = undefined;
}
else {
this._lockState.forWriting = pending[0].forWriting;
this._lockState.comment = '';
}
for (let i = 0; i < pending.length; i++) {
const lock = pending[i];
if (this._lockState.comment.length > 0) { this._lockState.comment += ' && '}
this._lockState.comment += lock.comment;
lock.resolve(lock);
}
}};
if (this._lockState.isLocked) {
// Queue lock request
this._lockQueue.push(lock);
return new Promise(resolve => {
lock.resolve = resolve;
});
}
else {
// No current lock, allow
this._lockState.isLocked = true;
this._lockState.forWriting = forWriting;
this._lockState.comment = comment;
return Promise.resolve(lock);
}
}
count(op, val) {
@ -1306,145 +1245,6 @@ class IndexQueryResults extends Array {
});
return IndexQueryResults.from(filtered, this.filterKey);
}
}
class IndexLock {
static get LOCK_STATE() {
return {
PENDING: 'pending',
LOCKED: 'locked',
EXPIRED: 'expired',
DONE: 'done'
};
};
/**
* Constructor for a record lock
* @param {Storage} storage
* @param {string} path
* @param {string} tid
* @param {boolean} forWriting
* @param {boolean} priority
*/
constructor(storage, path, tid, forWriting, priority = false) {
this.tid = tid;
this.path = path;
this.forWriting = forWriting;
this.priority = priority;
this.state = IndexLock.LOCK_STATE.PENDING;
this.storage = storage;
this.requested = Date.now();
this.granted = undefined;
this.expires = undefined;
this.comment = "";
this.waitingFor = null;
}
release(comment) {
return IndexLock.unlock(this, comment || this.comment);
}
/**
* Locks a path for writing. While the lock is in place, it's value cannot be changed by other transactions.
* @param {string} path path being locked
* @param {string} tid a unique value to identify your transaction
* @param {boolean} forWriting if the record will be written to. Multiple read locks can be granted access at the same time if there is no write lock. Once a write lock is granted, no others can read from or write to it.
* @returns {Promise<IndexLock>} returns a promise with the lock object once it is granted. It's .release method can be used as a shortcut to .unlock(path, tid) to release the lock
*/
static lock(path, tid, forWriting = true, comment = '', options = { withPriority: false, noTimeout: false }) {
let lock, proceed;
if (path instanceof IndexLock) {
lock = path;
lock.comment = `(retry: ${lock.comment})`;
proceed = true;
}
else {
lock = new IndexLock(this, path, tid, forWriting, options.withPriority === true);
lock.comment = comment;
_locks.push(lock);
const check = _allowLock(path, tid, forWriting);
lock.waitingFor = check.conflict || null;
proceed = check.allow;
}
if (proceed) {
lock.state = IndexLock.LOCK_STATE.LOCKED;
lock.granted = Date.now();
return Promise.resolve(lock);
}
else {
// Keep pending until clashing lock(s) is/are removed
console.assert(lock.state === IndexLock.LOCK_STATE.PENDING);
const p = new Promise((resolve, reject) => {
lock.resolve = resolve;
lock.reject = reject;
});
return p;
}
}
static unlock(lock, comment, processQueue = true) {// (path, tid, comment) {
const i = _locks.indexOf(lock); //_locks.findIndex(lock => lock.tid === tid && lock.path === path);
if (i < 0) {
const msg = `lock on "/${lock.path}" for tid ${lock.tid} wasn't found; ${comment}`;
debug.error(`unlock :: ${msg}`);
return Promise.reject(new Error(msg));
}
lock.state = IndexLock.LOCK_STATE.DONE;
clearTimeout(lock.timeout);
_locks.splice(i, 1);
processQueue && _processLockQueue();
return Promise.resolve(lock);
}
}
/**
* @type {IndexLock[]}
*/
const _locks = [];
function _allowLock(path, tid, forWriting) {
// Can this lock be granted now or do we have to wait?
const conflict = _locks
.filter(otherLock => otherLock.tid !== tid && otherLock.state === IndexLock.LOCK_STATE.LOCKED)
.find(otherLock => {
return (
// Other lock clashes with requested lock, if:
// One (or both) of them is for writing
(forWriting || otherLock.forWriting)
// and requested lock is on the path
&& path === otherLock.path
);
});
const clashes = typeof conflict !== 'undefined';
return { allow: !clashes, conflict };
}
function _processLockQueue() {
const pending = _locks
.filter(lock =>
lock.state === IndexLock.LOCK_STATE.PENDING
&& (lock.waitingFor === null || lock.waitingFor.state !== IndexLock.LOCK_STATE.LOCKED)
)
.sort((a,b) => {
if (a.priority && !b.priority) { return -1; }
else if (!a.priority && b.priority) { return 1; }
return a.requested < b.requested;
});
pending.forEach(lock => {
const check = _allowLock(lock.path, lock.tid, lock.forWriting);
lock.waitingFor = check.conflict || null;
if (check.allow) {
IndexLock.lock(lock)
.then(lock.resolve)
.catch(lock.reject);
}
});
}
/**