From e4ec5b3eb17956e7bdd26b26d2971d77d509fdb4 Mon Sep 17 00:00:00 2001 From: Tom French <15848336+TomAFrench@users.noreply.github.com> Date: Tue, 4 Jul 2023 19:38:52 +0700 Subject: [PATCH] chore: add types to `crdt` package (#1076) This PR adds types to the `crdt` directory to make the structure of the merkle tree (really a merkle radix trie) clearer. --------- Co-authored-by: Jed Fox <git@jedfox.com> Co-authored-by: Matiss Janis Aboltins <matiss@mja.lv> --- packages/crdt/src/crdt/index.ts | 1 + packages/crdt/src/crdt/merkle.test.ts | 16 +- packages/crdt/src/crdt/merkle.ts | 49 ++- packages/crdt/src/crdt/timestamp.test.ts | 10 +- packages/crdt/src/crdt/timestamp.ts | 402 +++++++++--------- packages/crdt/src/main.ts | 1 + packages/loot-core/src/server/main.ts | 2 - .../src/server/migrate/migrations.ts | 31 +- packages/loot-core/src/server/sync/encoder.ts | 10 +- packages/loot-core/src/server/sync/index.ts | 27 +- .../loot-core/src/server/sync/migrate.test.ts | 4 +- packages/loot-core/src/server/sync/migrate.ts | 4 +- packages/loot-core/src/server/sync/repair.ts | 15 +- .../src/server/sync/sync.property.test.ts | 4 +- .../loot-core/src/server/sync/sync.test.ts | 31 +- .../src/server/tests/mockSyncServer.ts | 26 +- packages/loot-core/src/server/undo.ts | 4 +- upcoming-release-notes/1076.md | 6 + 18 files changed, 341 insertions(+), 302 deletions(-) create mode 100644 upcoming-release-notes/1076.md diff --git a/packages/crdt/src/crdt/index.ts b/packages/crdt/src/crdt/index.ts index 18ad17bf1..a42b13ead 100644 --- a/packages/crdt/src/crdt/index.ts +++ b/packages/crdt/src/crdt/index.ts @@ -8,5 +8,6 @@ export { makeClientId, serializeClock, deserializeClock, + Clock, Timestamp, } from './timestamp'; diff --git a/packages/crdt/src/crdt/merkle.test.ts b/packages/crdt/src/crdt/merkle.test.ts index cf7828a1b..a0b7d98a6 100644 --- a/packages/crdt/src/crdt/merkle.test.ts +++ b/packages/crdt/src/crdt/merkle.test.ts @@ -17,7 +17,7 @@ function insertMessages(trie, messages) { describe('merkle trie', () => { test('adding an item works', () => { let trie = merkle.insert( - {}, + merkle.emptyTrie(), Timestamp.parse('2018-11-12T13:21:40.122Z-0000-0123456789ABCDEF'), ); trie = merkle.insert( @@ -28,8 +28,8 @@ describe('merkle trie', () => { }); test('diff returns the correct time difference', () => { - let trie1: { hash?: unknown } = {}; - let trie2: { hash?: unknown } = {}; + let trie1 = merkle.emptyTrie(); + let trie2 = merkle.emptyTrie(); const messages = [ // First client messages @@ -65,9 +65,9 @@ describe('merkle trie', () => { }); test('diffing works with empty tries', () => { - let trie1 = {}; + let trie1 = merkle.emptyTrie(); let trie2 = merkle.insert( - {}, + merkle.emptyTrie(), Timestamp.parse('2009-01-02T10:17:37.789Z-0000-0000testinguuid1'), ); @@ -90,7 +90,7 @@ describe('merkle trie', () => { message('2018-11-01T02:37:00.000Z-0000-0123456789ABCDEF', 2100), ]; - let trie: { hash?: unknown } = {}; + let trie = merkle.emptyTrie(); messages.forEach(msg => { trie = merkle.insert(trie, msg.timestamp); }); @@ -122,10 +122,10 @@ describe('merkle trie', () => { // Case 0: It always returns a base time when comparing with an // empty trie - expect(new Date(merkle.diff({}, trie)).toISOString()).toBe( + expect(new Date(merkle.diff(merkle.emptyTrie(), trie)).toISOString()).toBe( '1970-01-01T00:00:00.000Z', ); - expect(new Date(merkle.diff(trie, {})).toISOString()).toBe( + expect(new Date(merkle.diff(trie, merkle.emptyTrie())).toISOString()).toBe( '1970-01-01T00:00:00.000Z', ); diff --git a/packages/crdt/src/crdt/merkle.ts b/packages/crdt/src/crdt/merkle.ts index 48df9b0ef..d77b4f807 100644 --- a/packages/crdt/src/crdt/merkle.ts +++ b/packages/crdt/src/crdt/merkle.ts @@ -7,11 +7,27 @@ // * Need to check to make sure if account exists when handling // * transaction changes in syncing -export function getKeys(trie) { - return Object.keys(trie).filter(x => x !== 'hash'); +import { Timestamp } from './timestamp'; + +/** + * Represents a node within a trinary radix trie. + */ +export type TrieNode = { + '0'?: TrieNode; + '1'?: TrieNode; + '2'?: TrieNode; + hash: number; +}; + +export function emptyTrie(): TrieNode { + return { hash: 0 }; } -export function keyToTimestamp(key) { +export function getKeys(trie: TrieNode): ('0' | '1' | '2')[] { + return Object.keys(trie).filter(x => x !== 'hash') as ('0' | '1' | '2')[]; +} + +export function keyToTimestamp(key: string): number { // 16 is the length of the base 3 value of the current time in // minutes. Ensure it's padded to create the full value let fullkey = key + '0'.repeat(16 - key.length); @@ -20,7 +36,10 @@ export function keyToTimestamp(key) { return parseInt(fullkey, 3) * 1000 * 60; } -export function insert(trie, timestamp) { +/** + * Mutates `trie` to insert a node at `timestamp` + */ +export function insert(trie: TrieNode, timestamp: Timestamp) { let hash = timestamp.hash(); let key = Number(Math.floor(timestamp.millis() / 1000 / 60)).toString(3); @@ -28,7 +47,7 @@ export function insert(trie, timestamp) { return insertKey(trie, key, hash); } -function insertKey(trie, key, hash) { +function insertKey(trie: TrieNode, key: string, hash: number) { if (key.length === 0) { return trie; } @@ -41,15 +60,15 @@ function insertKey(trie, key, hash) { }); } -export function build(timestamps) { - let trie = {}; +export function build(timestamps: Timestamp[]) { + let trie = emptyTrie(); for (let timestamp of timestamps) { insert(trie, timestamp); } return trie; } -export function diff(trie1, trie2) { +export function diff(trie1: TrieNode, trie2: TrieNode): number { if (trie1.hash === trie2.hash) { return null; } @@ -103,12 +122,12 @@ export function diff(trie1, trie2) { } k += diffkey; - node1 = node1[diffkey] || {}; - node2 = node2[diffkey] || {}; + node1 = node1[diffkey] || emptyTrie(); + node2 = node2[diffkey] || emptyTrie(); } } -export function prune(trie, n = 2) { +export function prune(trie: TrieNode, n = 2): TrieNode { // Do nothing if empty if (!trie.hash) { return trie; @@ -118,12 +137,16 @@ export function prune(trie, n = 2) { keys.sort(); let next = { hash: trie.hash }; - keys = keys.slice(-n).map(k => (next[k] = prune(trie[k], n))); + + // Prune child nodes. + for (let k of keys.slice(-n)) { + next[k] = prune(trie[k], n); + } return next; } -export function debug(trie, k = '', indent = 0) { +export function debug(trie: TrieNode, k = '', indent = 0) { const str = ' '.repeat(indent) + (k !== '' ? `k: ${k} ` : '') + diff --git a/packages/crdt/src/crdt/timestamp.test.ts b/packages/crdt/src/crdt/timestamp.test.ts index 5dde3525e..c1fb77f21 100644 --- a/packages/crdt/src/crdt/timestamp.test.ts +++ b/packages/crdt/src/crdt/timestamp.test.ts @@ -16,10 +16,10 @@ describe('Timestamp', function () { describe('comparison', function () { it('should be in order', function () { - expect(Timestamp.zero()).toBe(Timestamp.zero()); - expect(Timestamp.max() > Timestamp.zero()).toBeTruthy(); - expect(Timestamp.send() > Timestamp.zero()).toBeTruthy(); - expect(Timestamp.send() < Timestamp.max()).toBeTruthy(); + expect(Timestamp.zero).toBe(Timestamp.zero); + expect(Timestamp.max > Timestamp.zero).toBeTruthy(); + expect(Timestamp.send() > Timestamp.zero).toBeTruthy(); + expect(Timestamp.send() < Timestamp.max).toBeTruthy(); }); }); @@ -42,7 +42,7 @@ describe('Timestamp', function () { '9999-12-31T23:59:59.999Z-FFFF-10000000000000000', ]; for (let invalidInput of invalidInputs) { - expect(Timestamp.parse(invalidInput)).toBe(null); + expect(Timestamp.parse(invalidInput as string)).toBe(null); } }); diff --git a/packages/crdt/src/crdt/timestamp.ts b/packages/crdt/src/crdt/timestamp.ts index 16a3d4a0c..190924c7e 100644 --- a/packages/crdt/src/crdt/timestamp.ts +++ b/packages/crdt/src/crdt/timestamp.ts @@ -1,6 +1,8 @@ import murmurhash from 'murmurhash'; import { v4 as uuidv4 } from 'uuid'; +import { TrieNode } from './merkle'; + /** * Hybrid Unique Logical Clock (HULC) timestamp generator * @@ -24,29 +26,37 @@ import { v4 as uuidv4 } from 'uuid'; * http://www.cse.buffalo.edu/tech-reports/2014-04.pdf */ +export type Clock = { + timestamp: MutableTimestamp; + merkle: TrieNode; +}; + // A mutable global clock -let clock = null; +let clock: Clock = null; -export function setClock(clock_) { +export function setClock(clock_: Clock): void { clock = clock_; } -export function getClock() { +export function getClock(): Clock { return clock; } -export function makeClock(timestamp, merkle = {}) { +export function makeClock( + timestamp: Timestamp, + merkle: TrieNode = { hash: 0 }, +) { return { timestamp: MutableTimestamp.from(timestamp), merkle }; } -export function serializeClock(clock) { +export function serializeClock(clock: Clock): string { return JSON.stringify({ timestamp: clock.timestamp.toString(), merkle: clock.merkle, }); } -export function deserializeClock(clock) { +export function deserializeClock(clock: string): Clock { let data; try { data = JSON.parse(clock); @@ -79,18 +89,7 @@ const MAX_NODE_LENGTH = 16; * timestamp instance class */ export class Timestamp { - static init; - static max; - static parse; - static recv; - static send; - static since; - static zero; - static ClockDriftError; - static DuplicateNodeError; - static OverflowError; - - _state; + _state: { millis: number; counter: number; node: string }; constructor(millis: number, counter: number, node: string) { this._state = { @@ -127,222 +126,221 @@ export class Timestamp { hash() { return murmurhash.v3(this.toString()); } -} - -class MutableTimestamp extends Timestamp { - static from; - - setMillis(n) { - this._state.millis = n; - } - setCounter(n) { - this._state.counter = n; - } + // Timestamp generator initialization + // * sets the node ID to an arbitrary value + // * useful for mocking/unit testing + static init(options: { maxDrift?: number; node?: string } = {}) { + if (options.maxDrift) { + config.maxDrift = options.maxDrift; + } - setNode(n) { - this._state.node = n; + setClock( + makeClock( + new Timestamp( + 0, + 0, + options.node + ? ('0000000000000000' + options.node).toString().slice(-16) + : '', + ), + ), + ); } -} -MutableTimestamp.from = timestamp => { - return new MutableTimestamp( - timestamp.millis(), - timestamp.counter(), - timestamp.node(), + /** + * maximum timestamp + */ + static max = Timestamp.parse( + '9999-12-31T23:59:59.999Z-FFFF-FFFFFFFFFFFFFFFF', ); -}; -// Timestamp generator initialization -// * sets the node ID to an arbitrary value -// * useful for mocking/unit testing -Timestamp.init = function (options: { maxDrift?: number; node?: string } = {}) { - if (options.maxDrift) { - config.maxDrift = options.maxDrift; + /** + * timestamp parsing + * converts a fixed-length string timestamp to the structured value + */ + static parse(timestamp: string | Timestamp): Timestamp | null { + if (timestamp instanceof Timestamp) { + return timestamp; + } + if (typeof timestamp === 'string') { + let parts = timestamp.split('-'); + if (parts && parts.length === 5) { + let millis = Date.parse(parts.slice(0, 3).join('-')).valueOf(); + let counter = parseInt(parts[3], 16); + let node = parts[4]; + if ( + !isNaN(millis) && + millis >= 0 && + !isNaN(counter) && + counter <= MAX_COUNTER && + typeof node === 'string' && + node.length <= MAX_NODE_LENGTH + ) { + return new Timestamp(millis, counter, node); + } + } + } + return null; } - setClock( - makeClock( - new Timestamp( - 0, - 0, - options.node - ? ('0000000000000000' + options.node).toString().slice(-16) - : '', - ), - ), - ); -}; + /** + * Timestamp send. Generates a unique, monotonic timestamp suitable + * for transmission to another system in string format + */ + static send(): Timestamp | null { + if (!clock) { + return null; + } -/** - * Timestamp send. Generates a unique, monotonic timestamp suitable - * for transmission to another system in string format - */ -Timestamp.send = function () { - if (!clock) { - return null; - } + // retrieve the local wall time + let phys = Date.now(); - // retrieve the local wall time - let phys = Date.now(); + // unpack the clock.timestamp logical time and counter + let lOld = clock.timestamp.millis(); + let cOld = clock.timestamp.counter(); - // unpack the clock.timestamp logical time and counter - let lOld = clock.timestamp.millis(); - let cOld = clock.timestamp.counter(); + // calculate the next logical time and counter + // * ensure that the logical time never goes backward + // * increment the counter if phys time does not advance + let lNew = Math.max(lOld, phys); + let cNew = lOld === lNew ? cOld + 1 : 0; - // calculate the next logical time and counter - // * ensure that the logical time never goes backward - // * increment the counter if phys time does not advance - let lNew = Math.max(lOld, phys); - let cNew = lOld === lNew ? cOld + 1 : 0; + // check the result for drift and counter overflow + if (lNew - phys > config.maxDrift) { + throw new Timestamp.ClockDriftError(lNew, phys, config.maxDrift); + } + if (cNew > MAX_COUNTER) { + throw new Timestamp.OverflowError(); + } - // check the result for drift and counter overflow - if (lNew - phys > config.maxDrift) { - throw new Timestamp.ClockDriftError(lNew, phys, config.maxDrift); - } - if (cNew > MAX_COUNTER) { - throw new Timestamp.OverflowError(); + // repack the logical time/counter + clock.timestamp.setMillis(lNew); + clock.timestamp.setCounter(cNew); + + return new Timestamp( + clock.timestamp.millis(), + clock.timestamp.counter(), + clock.timestamp.node(), + ); } - // repack the logical time/counter - clock.timestamp.setMillis(lNew); - clock.timestamp.setCounter(cNew); + // Timestamp receive. Parses and merges a timestamp from a remote + // system with the local timeglobal uniqueness and monotonicity are + // preserved + static recv(msg: Timestamp): Timestamp | null { + if (!clock) { + return null; + } - return new Timestamp( - clock.timestamp.millis(), - clock.timestamp.counter(), - clock.timestamp.node(), - ); -}; + // retrieve the local wall time + let phys = Date.now(); -// Timestamp receive. Parses and merges a timestamp from a remote -// system with the local timeglobal uniqueness and monotonicity are -// preserved -Timestamp.recv = function (msg) { - if (!clock) { - return null; - } + // unpack the message wall time/counter + let lMsg = msg.millis(); + let cMsg = msg.counter(); - // retrieve the local wall time - let phys = Date.now(); + // assert the node id and remote clock drift + // if (msg.node() === clock.timestamp.node()) { + // throw new Timestamp.DuplicateNodeError(clock.timestamp.node()); + // } + if (lMsg - phys > config.maxDrift) { + throw new Timestamp.ClockDriftError(); + } - // unpack the message wall time/counter - let lMsg = msg.millis(); - let cMsg = msg.counter(); + // unpack the clock.timestamp logical time and counter + let lOld = clock.timestamp.millis(); + let cOld = clock.timestamp.counter(); + + // calculate the next logical time and counter + // . ensure that the logical time never goes backward + // . if all logical clocks are equal, increment the max counter + // . if max = old > message, increment local counter + // . if max = messsage > old, increment message counter + // . otherwise, clocks are monotonic, reset counter + let lNew = Math.max(Math.max(lOld, phys), lMsg); + let cNew = + lNew === lOld && lNew === lMsg + ? Math.max(cOld, cMsg) + 1 + : lNew === lOld + ? cOld + 1 + : lNew === lMsg + ? cMsg + 1 + : 0; + + // check the result for drift and counter overflow + if (lNew - phys > config.maxDrift) { + throw new Timestamp.ClockDriftError(); + } + if (cNew > MAX_COUNTER) { + throw new Timestamp.OverflowError(); + } - // assert the node id and remote clock drift - // if (msg.node() === clock.timestamp.node()) { - // throw new Timestamp.DuplicateNodeError(clock.timestamp.node()); - // } - if (lMsg - phys > config.maxDrift) { - throw new Timestamp.ClockDriftError(); - } + // repack the logical time/counter + clock.timestamp.setMillis(lNew); + clock.timestamp.setCounter(cNew); - // unpack the clock.timestamp logical time and counter - let lOld = clock.timestamp.millis(); - let cOld = clock.timestamp.counter(); - - // calculate the next logical time and counter - // . ensure that the logical time never goes backward - // . if all logical clocks are equal, increment the max counter - // . if max = old > message, increment local counter - // . if max = messsage > old, increment message counter - // . otherwise, clocks are monotonic, reset counter - let lNew = Math.max(Math.max(lOld, phys), lMsg); - let cNew = - lNew === lOld && lNew === lMsg - ? Math.max(cOld, cMsg) + 1 - : lNew === lOld - ? cOld + 1 - : lNew === lMsg - ? cMsg + 1 - : 0; - - // check the result for drift and counter overflow - if (lNew - phys > config.maxDrift) { - throw new Timestamp.ClockDriftError(); - } - if (cNew > MAX_COUNTER) { - throw new Timestamp.OverflowError(); + return new Timestamp( + clock.timestamp.millis(), + clock.timestamp.counter(), + clock.timestamp.node(), + ); } - // repack the logical time/counter - clock.timestamp.setMillis(lNew); - clock.timestamp.setCounter(cNew); - - return new Timestamp( - clock.timestamp.millis(), - clock.timestamp.counter(), - clock.timestamp.node(), + /** + * zero/minimum timestamp + */ + static zero = Timestamp.parse( + '1970-01-01T00:00:00.000Z-0000-0000000000000000', ); -}; -/** - * timestamp parsing - * converts a fixed-length string timestamp to the structured value - */ -Timestamp.parse = function (timestamp: string): Timestamp | null { - if (typeof timestamp === 'string') { - let parts = timestamp.split('-'); - if (parts && parts.length === 5) { - let millis = Date.parse(parts.slice(0, 3).join('-')).valueOf(); - let counter = parseInt(parts[3], 16); - let node = parts[4]; - if ( - !isNaN(millis) && - millis >= 0 && - !isNaN(counter) && - counter <= MAX_COUNTER && - typeof node === 'string' && - node.length <= MAX_NODE_LENGTH - ) { - return new Timestamp(millis, counter, node); - } + static since = isoString => isoString + '-0000-0000000000000000'; + + /** + * error classes + */ + static DuplicateNodeError = class DuplicateNodeError extends Error { + constructor(node: string) { + super('duplicate node identifier ' + node); + this.name = 'DuplicateNodeError'; } - } - return null; -}; + }; -/** - * zero/minimum timestamp - */ -let zero = Timestamp.parse('1970-01-01T00:00:00.000Z-0000-0000000000000000'); -Timestamp.zero = function () { - return zero; -}; + static ClockDriftError = class ClockDriftError extends Error { + constructor(...args: unknown[]) { + super( + ['maximum clock drift exceeded'].concat(args as string[]).join(' '), + ); + this.name = 'ClockDriftError'; + } + }; -/** - * maximum timestamp - */ -let max = Timestamp.parse('9999-12-31T23:59:59.999Z-FFFF-FFFFFFFFFFFFFFFF'); -Timestamp.max = function () { - return max; -}; + static OverflowError = class OverflowError extends Error { + constructor() { + super('timestamp counter overflow'); + this.name = 'OverflowError'; + } + }; +} -Timestamp.since = isoString => { - return isoString + '-0000-0000000000000000'; -}; +class MutableTimestamp extends Timestamp { + static from(timestamp) { + return new MutableTimestamp( + timestamp.millis(), + timestamp.counter(), + timestamp.node(), + ); + } -/** - * error classes - */ -Timestamp.DuplicateNodeError = class extends Error { - constructor(node) { - super('duplicate node identifier ' + node); - this.name = 'DuplicateNodeError'; + setMillis(n) { + this._state.millis = n; } -}; -Timestamp.ClockDriftError = class extends Error { - constructor(...args) { - super(['maximum clock drift exceeded'].concat(args).join(' ')); - this.name = 'ClockDriftError'; + setCounter(n) { + this._state.counter = n; } -}; -Timestamp.OverflowError = class extends Error { - constructor() { - super('timestamp counter overflow'); - this.name = 'OverflowError'; + setNode(n) { + this._state.node = n; } -}; +} diff --git a/packages/crdt/src/main.ts b/packages/crdt/src/main.ts index 76c4159a5..ff5d5dacb 100644 --- a/packages/crdt/src/main.ts +++ b/packages/crdt/src/main.ts @@ -7,6 +7,7 @@ export { makeClientId, serializeClock, deserializeClock, + Clock, Timestamp, } from './crdt'; diff --git a/packages/loot-core/src/server/main.ts b/packages/loot-core/src/server/main.ts index a9eacee60..b4fcf52ea 100644 --- a/packages/loot-core/src/server/main.ts +++ b/packages/loot-core/src/server/main.ts @@ -61,7 +61,6 @@ import { setSyncingMode, makeTestMessage, clearFullSyncTimeout, - syncAndReceiveMessages, resetSync, repairSync, } from './sync'; @@ -2462,7 +2461,6 @@ export const lib = { return res; }, on: (name, func) => app.events.on(name, func), - syncAndReceiveMessages, q, db, diff --git a/packages/loot-core/src/server/migrate/migrations.ts b/packages/loot-core/src/server/migrate/migrations.ts index ca229ca68..f4b6b3993 100644 --- a/packages/loot-core/src/server/migrate/migrations.ts +++ b/packages/loot-core/src/server/migrate/migrations.ts @@ -1,6 +1,7 @@ // We have to bundle in JS migrations manually to avoid having to `eval` // them which doesn't play well with CSP. There isn't great, and eventually // we can remove this migration. +import { Database } from 'better-sqlite3'; import { v4 as uuidv4 } from 'uuid'; import m1632571489012 from '../../../migrations/1632571489012_remove_cache'; @@ -13,18 +14,21 @@ let javascriptMigrations = { 1632571489012: m1632571489012, }; -export async function withMigrationsDir(dir, func) { +export async function withMigrationsDir( + dir: string, + func: () => Promise<void>, +): Promise<void> { let oldDir = MIGRATIONS_DIR; MIGRATIONS_DIR = dir; await func(); MIGRATIONS_DIR = oldDir; } -export function getMigrationsDir() { +export function getMigrationsDir(): string { return MIGRATIONS_DIR; } -function getMigrationId(name) { +function getMigrationId(name: string): number { return parseInt(name.match(/^(\d)+/)[0]); } @@ -36,7 +40,7 @@ export function getUpMigration(id, names) { } } -export async function getAppliedMigrations(db) { +export async function getAppliedMigrations(db: Database): Promise<number[]> { const rows = await sqlite.runQuery<{ id: number }>( db, 'SELECT * FROM __migrations__ ORDER BY id ASC', @@ -46,7 +50,9 @@ export async function getAppliedMigrations(db) { return rows.map(row => row.id); } -export async function getMigrationList(migrationsDir) { +export async function getMigrationList( + migrationsDir: string, +): Promise<string[]> { const files = await fs.listDir(migrationsDir); return files .filter(name => name.match(/(\.sql|\.js)$/)) @@ -62,7 +68,7 @@ export async function getMigrationList(migrationsDir) { }); } -export function getPending(appliedIds, all) { +export function getPending(appliedIds: number[], all: string[]): string[] { return all.filter(name => { const id = getMigrationId(name); return appliedIds.indexOf(id) === -1; @@ -94,7 +100,11 @@ async function applySql(db, sql) { } } -export async function applyMigration(db, name, migrationsDir) { +export async function applyMigration( + db: Database, + name: string, + migrationsDir: string, +): Promise<void> { const code = await fs.readFile(fs.join(migrationsDir, name)); if (name.match(/\.js$/)) { await applyJavaScript(db, getMigrationId(name)); @@ -106,7 +116,10 @@ export async function applyMigration(db, name, migrationsDir) { ]); } -function checkDatabaseValidity(appliedIds, available) { +function checkDatabaseValidity( + appliedIds: number[], + available: string[], +): void { for (let i = 0; i < appliedIds.length; i++) { if ( i >= available.length || @@ -121,7 +134,7 @@ function checkDatabaseValidity(appliedIds, available) { } } -export async function migrate(db) { +export async function migrate(db: Database): Promise<string[]> { let appliedIds = await getAppliedMigrations(db); let available = await getMigrationList(MIGRATIONS_DIR); diff --git a/packages/loot-core/src/server/sync/encoder.ts b/packages/loot-core/src/server/sync/encoder.ts index aa4515640..fc6df6c32 100644 --- a/packages/loot-core/src/server/sync/encoder.ts +++ b/packages/loot-core/src/server/sync/encoder.ts @@ -1,4 +1,4 @@ -import { SyncProtoBuf } from '@actual-app/crdt'; +import { Timestamp, SyncProtoBuf } from '@actual-app/crdt'; import * as encryption from '../encryption'; import { SyncError } from '../errors'; @@ -20,7 +20,7 @@ function coerceBuffer(value) { export async function encode( groupId: string, fileId: string, - since: string, + since: Timestamp, messages: Message[], ): Promise<Uint8Array> { let { encryptKeyId } = prefs.getPrefs(); @@ -29,7 +29,7 @@ export async function encode( for (let i = 0; i < messages.length; i++) { let msg = messages[i]; let envelopePb = new SyncProtoBuf.MessageEnvelope(); - envelopePb.setTimestamp(msg.timestamp); + envelopePb.setTimestamp(msg.timestamp.toString()); let messagePb = new SyncProtoBuf.Message(); messagePb.setDataset(msg.dataset); @@ -66,7 +66,7 @@ export async function encode( requestPb.setGroupid(groupId); requestPb.setFileid(fileId); requestPb.setKeyid(encryptKeyId); - requestPb.setSince(since); + requestPb.setSince(since.toString()); return requestPb.serializeBinary(); } @@ -83,7 +83,7 @@ export async function decode( for (let i = 0; i < list.length; i++) { let envelopePb = list[i]; - let timestamp = envelopePb.getTimestamp(); + let timestamp = Timestamp.parse(envelopePb.getTimestamp()); let encrypted = envelopePb.getIsencrypted(); let msg; diff --git a/packages/loot-core/src/server/sync/index.ts b/packages/loot-core/src/server/sync/index.ts index d2ee55476..6b0a1b9dc 100644 --- a/packages/loot-core/src/server/sync/index.ts +++ b/packages/loot-core/src/server/sync/index.ts @@ -249,7 +249,7 @@ export type Message = { dataset: string; old?: unknown; row: string; - timestamp: string; + timestamp: Timestamp; value: string | number | null; }; @@ -333,9 +333,8 @@ export const applyMessages = sequential(async (messages: Message[]) => { db.transaction(() => { let added = new Set(); - for (let i = 0; i < messages.length; i++) { - let msg = messages[i]; - let { dataset, row, column, timestamp, value } = msg; + for (const msg of messages) { + const { dataset, row, column, timestamp, value } = msg; if (!msg.old) { apply(msg, getIn(oldData, [dataset, row]) || added.has(dataset + row)); @@ -357,7 +356,7 @@ export const applyMessages = sequential(async (messages: Message[]) => { [timestamp.toString(), dataset, row, column, serializeValue(value)], ); - currentMerkle = merkle.insert(currentMerkle, msg.timestamp); + currentMerkle = merkle.insert(currentMerkle, timestamp); } } @@ -487,7 +486,7 @@ export async function sendMessages(messages: Message[]) { } } -export function getMessagesSince(since: string) { +export function getMessagesSince(since: string): Message[] { return db.runQuery( 'SELECT timestamp, dataset, row, column, value FROM messages_crdt WHERE timestamp > ?', [since], @@ -495,21 +494,6 @@ export function getMessagesSince(since: string) { ); } -export async function syncAndReceiveMessages( - messages: Message[], - since: string, -): Promise<Message[]> { - let localMessages = await getMessagesSince(since); - await receiveMessages( - messages.map(msg => ({ - ...msg, - value: deserializeValue(msg.value as string), - timestamp: Timestamp.parse(msg.timestamp), - })), - ); - return localMessages; -} - export function clearFullSyncTimeout(): void { if (syncTimeout) { clearTimeout(syncTimeout); @@ -691,7 +675,6 @@ async function _fullSync( res.messages.map(msg => ({ ...msg, value: deserializeValue(msg.value as string), - timestamp: Timestamp.parse(msg.timestamp), })), ); } diff --git a/packages/loot-core/src/server/sync/migrate.test.ts b/packages/loot-core/src/server/sync/migrate.test.ts index b2fd82fed..459f8da0c 100644 --- a/packages/loot-core/src/server/sync/migrate.test.ts +++ b/packages/loot-core/src/server/sync/migrate.test.ts @@ -1,3 +1,4 @@ +import { Timestamp } from '@actual-app/crdt'; import fc from 'fast-check'; import * as arbs from '../../mocks/arbitrary-schema'; @@ -39,7 +40,8 @@ let messageArb: fc.Arbitrary<Message> = fc }) .noBias() .noShrink() - .map(date => date.toISOString() + '-0000-0123456789ABCDEF'); + .map(date => date.toISOString() + '-0000-0123456789ABCDEF') + .map(Timestamp.parse); return fc.record<Message>({ timestamp: timestamp, diff --git a/packages/loot-core/src/server/sync/migrate.ts b/packages/loot-core/src/server/sync/migrate.ts index 6238aa41a..dbc0fd951 100644 --- a/packages/loot-core/src/server/sync/migrate.ts +++ b/packages/loot-core/src/server/sync/migrate.ts @@ -1,11 +1,11 @@ import { Timestamp } from '@actual-app/crdt'; -import { addSyncListener, applyMessages } from './index'; +import { Message, addSyncListener, applyMessages } from './index'; function migrateParentIds(_oldValues, newValues) { newValues.forEach((items, table) => { if (table === 'transactions') { - let toApply = []; + let toApply: Message[] = []; items.forEach(newValue => { if ( diff --git a/packages/loot-core/src/server/sync/repair.ts b/packages/loot-core/src/server/sync/repair.ts index e8736eb69..ba55acd13 100644 --- a/packages/loot-core/src/server/sync/repair.ts +++ b/packages/loot-core/src/server/sync/repair.ts @@ -2,9 +2,16 @@ import { serializeClock, getClock, Timestamp, merkle } from '@actual-app/crdt'; import * as db from '../db'; -export function rebuildMerkleHash() { - let rows = db.runQuery('SELECT timestamp FROM messages_crdt', [], true); - let trie: Record<string, unknown> = {}; +export function rebuildMerkleHash(): { + numMessages: number; + trie: merkle.TrieNode; +} { + let rows: { timestamp: string }[] = db.runQuery( + 'SELECT timestamp FROM messages_crdt', + [], + true, + ); + let trie = merkle.emptyTrie(); for (let i = 0; i < rows.length; i++) { trie = merkle.insert(trie, Timestamp.parse(rows[i].timestamp)); @@ -16,7 +23,7 @@ export function rebuildMerkleHash() { }; } -export default async function repairSync() { +export default async function repairSync(): Promise<void> { let rebuilt = rebuildMerkleHash(); let clock = getClock(); diff --git a/packages/loot-core/src/server/sync/sync.property.test.ts b/packages/loot-core/src/server/sync/sync.property.test.ts index f43f63b28..94ad71c2a 100644 --- a/packages/loot-core/src/server/sync/sync.property.test.ts +++ b/packages/loot-core/src/server/sync/sync.property.test.ts @@ -251,7 +251,7 @@ async function run(msgs) { await encoder.encode( 'group', clientId2, - Timestamp.zero(), + Timestamp.zero, res.secondMessages.map(x => ({ ...x, value: sync.serializeValue(x.value), @@ -270,7 +270,7 @@ async function run(msgs) { await encoder.encode( 'group', clientId2, - Timestamp.zero(), + Timestamp.zero, res.secondMessages.map(x => ({ ...x, value: sync.serializeValue(x.value), diff --git a/packages/loot-core/src/server/sync/sync.test.ts b/packages/loot-core/src/server/sync/sync.test.ts index 91cc537d9..a0a8673c7 100644 --- a/packages/loot-core/src/server/sync/sync.test.ts +++ b/packages/loot-core/src/server/sync/sync.test.ts @@ -97,28 +97,32 @@ describe('Sync', () => { prefs.loadPrefs(); prefs.savePrefs({ groupId: 'group', - lastSyncedTimestamp: Timestamp.zero().toString(), + lastSyncedTimestamp: Timestamp.zero.toString(), }); await mockSyncServer.handlers['/sync/sync']( await encoder.encode( 'group', 'client', - '1970-01-01T01:17:37.000Z-0000-0000testinguuid2', + Timestamp.parse('1970-01-01T01:17:37.000Z-0000-0000testinguuid2'), [ { dataset: 'transactions', row: 'foo', column: 'amount', value: 'N:3200', - timestamp: '1970-01-02T05:17:36.789Z-0000-0000testinguuid2', + timestamp: Timestamp.parse( + '1970-01-02T05:17:36.789Z-0000-0000testinguuid2', + ), }, { dataset: 'transactions', row: 'foo', column: 'amount', value: 'N:4200', - timestamp: '1970-01-02T10:17:36.999Z-0000-0000testinguuid2', + timestamp: Timestamp.parse( + '1970-01-02T10:17:36.999Z-0000-0000testinguuid2', + ), }, ], ), @@ -153,7 +157,7 @@ async function asSecondClient(func) { prefs.loadPrefs(); prefs.savePrefs({ groupId: 'group', - lastSyncedTimestamp: Timestamp.zero().toString(), + lastSyncedTimestamp: Timestamp.zero.toString(), }); await func(); @@ -161,7 +165,7 @@ async function asSecondClient(func) { await global.emptyDatabase()(); prefs.savePrefs({ groupId: 'group', - lastSyncedTimestamp: Timestamp.zero().toString(), + lastSyncedTimestamp: Timestamp.zero.toString(), }); } @@ -236,10 +240,7 @@ describe('Sync projections', () => { registerBudgetMonths(['2017-01', '2017-02']); // Get all the messages. We'll apply them in two passes - let messages = mockSyncServer.getMessages().map(msg => ({ - ...msg, - timestamp: Timestamp.parse(msg.timestamp), - })); + let messages = mockSyncServer.getMessages(); // Apply all but the last message (which deletes the category) await applyMessages(messages.slice(0, -1)); @@ -290,10 +291,7 @@ describe('Sync projections', () => { registerBudgetMonths(['2017-01', '2017-02']); // Get all the messages. We'll apply them in two passes - let messages = mockSyncServer.getMessages().map(msg => ({ - ...msg, - timestamp: Timestamp.parse(msg.timestamp), - })); + let messages = mockSyncServer.getMessages(); let firstMessages = messages.filter(m => m.column !== 'tombstone'); let secondMessages = messages.filter(m => m.column === 'tombstone'); @@ -327,10 +325,7 @@ describe('Sync projections', () => { registerBudgetMonths(['2017-01', '2017-02']); // Get all the messages. We'll apply them in two passes - let messages = mockSyncServer.getMessages().map(msg => ({ - ...msg, - timestamp: Timestamp.parse(msg.timestamp), - })); + let messages = mockSyncServer.getMessages(); let firstMessages = messages.slice(0, -2); let secondMessages = messages.slice(-2); diff --git a/packages/loot-core/src/server/tests/mockSyncServer.ts b/packages/loot-core/src/server/tests/mockSyncServer.ts index 1e64534ea..09db3b763 100644 --- a/packages/loot-core/src/server/tests/mockSyncServer.ts +++ b/packages/loot-core/src/server/tests/mockSyncServer.ts @@ -1,11 +1,23 @@ -import { makeClock, Timestamp, merkle, SyncProtoBuf } from '@actual-app/crdt'; +import { + Clock, + makeClock, + Timestamp, + merkle, + SyncProtoBuf, +} from '@actual-app/crdt'; + +import { Message } from '../sync'; import { basic as defaultMockData } from './mockData.json'; const handlers = {}; let currentMockData = defaultMockData; let currentClock = makeClock(new Timestamp(0, 0, '0000000000000000')); -let currentMessages = []; +let currentMessages: { + timestamp: string; + is_encrypted: boolean; + content: Uint8Array; +}[] = []; // Ugh, this is duplicated... function deserializeValue(value) { @@ -27,7 +39,7 @@ handlers['/'] = () => { return 'development'; }; -handlers['/sync/sync'] = async data => { +handlers['/sync/sync'] = async (data: Uint8Array): Promise<Uint8Array> => { let requestPb = SyncProtoBuf.SyncRequest.deserializeBinary(data); let since = requestPb.getSince(); let messages = requestPb.getMessagesList(); @@ -39,7 +51,7 @@ handlers['/sync/sync'] = async data => { currentMessages.push({ timestamp: msg.getTimestamp(), is_encrypted: msg.getIsencrypted(), - content: msg.getContent(), + content: msg.getContent_asU8(), }); currentClock.merkle = merkle.insert( @@ -104,17 +116,17 @@ export const reset = () => { currentMessages = []; }; -export const getClock = () => { +export const getClock = (): Clock => { return currentClock; }; -export const getMessages = () => { +export const getMessages = (): Message[] => { return currentMessages.map(msg => { let { timestamp, content } = msg; let fields = SyncProtoBuf.Message.deserializeBinary(content); return { - timestamp: timestamp, + timestamp: Timestamp.parse(timestamp), dataset: fields.getDataset(), row: fields.getRow(), column: fields.getColumn(), diff --git a/packages/loot-core/src/server/undo.ts b/packages/loot-core/src/server/undo.ts index f21766309..4fffa9049 100644 --- a/packages/loot-core/src/server/undo.ts +++ b/packages/loot-core/src/server/undo.ts @@ -4,7 +4,7 @@ import * as connection from '../platform/server/connection'; import { getIn } from '../shared/util'; import { withMutatorContext, getMutatorContext } from './mutators'; -import { sendMessages } from './sync'; +import { Message, sendMessages } from './sync'; // A marker always sits as the first entry to simplify logic type MarkerMessage = { type: 'marker'; meta?: unknown }; @@ -228,7 +228,7 @@ export async function redo() { } } -function redoResurrections(messages, oldData) { +function redoResurrections(messages, oldData): Message[] { let resurrect = new Set<string>(); messages.forEach(message => { diff --git a/upcoming-release-notes/1076.md b/upcoming-release-notes/1076.md new file mode 100644 index 000000000..d5e0350fc --- /dev/null +++ b/upcoming-release-notes/1076.md @@ -0,0 +1,6 @@ +--- +category: Maintenance +authors: [TomAFrench] +--- + +Add types to `crdt` directory \ No newline at end of file -- GitLab