import { captureException } from '../../platform/exceptions'; import * as asyncStorage from '../../platform/server/asyncStorage'; import * as connection from '../../platform/server/connection'; import logger from '../../platform/server/log'; import { sequential, once } from '../../shared/async'; import { setIn, getIn } from '../../shared/util'; import { triggerBudgetChanges, setType as setBudgetType } from '../budget/base'; import { serializeClock, deserializeClock, getClock, Timestamp, merkle, } from '../crdt'; import * as db from '../db'; import { PostError, SyncError } from '../errors'; import app from '../main-app'; import { runMutator } from '../mutators'; import { postBinary } from '../post'; import * as prefs from '../prefs'; import { getServer } from '../server-config'; import * as sheet from '../sheet'; import * as undo from '../undo'; import * as encoder from './encoder'; import { rebuildMerkleHash } from './repair'; export { default as makeTestMessage } from './make-test-message'; export { default as resetSync } from './reset'; export { default as repairSync } from './repair'; let FULL_SYNC_DELAY = 1000; let SYNCING_MODE = 'enabled'; export function setSyncingMode(mode) { let prevMode = SYNCING_MODE; switch (mode) { case 'enabled': SYNCING_MODE = 'enabled'; break; case 'offline': SYNCING_MODE = 'offline'; break; case 'disabled': SYNCING_MODE = 'disabled'; break; case 'import': SYNCING_MODE = 'import'; break; default: throw new Error('setSyncingMode: invalid mode: ' + mode); } return prevMode; } export function checkSyncingMode(mode) { switch (mode) { case 'enabled': return SYNCING_MODE === 'enabled' || SYNCING_MODE === 'offline'; case 'disabled': return SYNCING_MODE === 'disabled' || SYNCING_MODE === 'import'; case 'offline': return SYNCING_MODE === 'offline'; case 'import': return SYNCING_MODE === 'import'; default: throw new Error('checkSyncingMode: invalid mode: ' + mode); } } function apply(msg, prev?: unknown) { let { dataset, row, column, value } = msg; if (dataset === 'prefs') { // Do nothing, it doesn't exist in the db } else { let query; try { if (prev) { query = { sql: db.cache(`UPDATE ${dataset} SET ${column} = ? WHERE id = ?`), params: [value, row], }; } else { query = { sql: db.cache(`INSERT INTO ${dataset} (id, ${column}) VALUES (?, ?)`), params: [row, value], }; } db.runQuery(query.sql, query.params); } catch (error) { throw new SyncError('invalid-schema', { error: { message: error.message, stack: error.stack }, query, }); } } } // TODO: convert to `whereIn` async function fetchAll(table, ids) { let results = []; // was 500, but that caused a stack overflow in Safari let batchSize = 100; for (let i = 0; i < ids.length; i += batchSize) { let partIds = ids.slice(i, i + batchSize); let sql; let column = `${table}.id`; // We have to provide *mapped* data so the spreadsheet works. The functions // which trigger budget changes based on data changes assumes data has been // mapped. The only mapped data that the budget is concerned about is // categories. This is kind of annoying, but we manually map it here if (table === 'transactions') { sql = ` SELECT t.*, c.transferId AS category FROM transactions t LEFT JOIN category_mapping c ON c.id = t.category `; column = 't.id'; } else { sql = `SELECT * FROM ${table}`; } sql += ` WHERE `; sql += partIds.map(id => `${column} = ?`).join(' OR '); try { let rows = await db.runQuery(sql, partIds, true); results = results.concat(rows); } catch (error) { throw new SyncError('invalid-schema', { error: { message: error.message, stack: error.stack, }, sql, params: partIds, }); } } return results; } export function serializeValue(value) { if (value === null) { return '0:'; } else if (typeof value === 'number') { return 'N:' + value; } else if (typeof value === 'string') { return 'S:' + value; } throw new Error('Unserializable value type: ' + JSON.stringify(value)); } export function deserializeValue(value) { const type = value[0]; switch (type) { case '0': return null; case 'N': return parseFloat(value.slice(2)); case 'S': return value.slice(2); default: } throw new Error('Invalid type key for value: ' + value); } let _syncListeners = []; export function addSyncListener(func) { _syncListeners.push(func); return () => { _syncListeners = _syncListeners.filter(f => f !== func); }; } async function compareMessages(messages) { let newMessages = []; for (let i = 0; i < messages.length; i++) { let message = messages[i]; let { dataset, row, column, timestamp } = message; let timestampStr = timestamp.toString(); let res = db.runQuery( db.cache( 'SELECT timestamp FROM messages_crdt WHERE dataset = ? AND row = ? AND column = ? AND timestamp >= ?', ), [dataset, row, column, timestampStr], true, ); // Returned message is any one that is "later" than this message, // meaning if the result exists this message is an old one if (res.length === 0) { newMessages.push(message); } else if (res[0].timestamp !== timestampStr) { newMessages.push({ ...message, old: true }); } } return newMessages; } // This is the fast path `apply` function when in "import" mode. // There's no need to run through the whole sync system when // importing, but **there is a caveat**: because we don't run sync // listeners importers should not rely on any functions that use any // projected state (like rules). We can't fire those because they // depend on having both old and new data which we don't quere here function applyMessagesForImport(messages) { db.transaction(() => { for (let i = 0; i < messages.length; i++) { let msg = messages[i]; let { dataset } = msg; if (!msg.old) { try { apply(msg); } catch (e) { apply(msg, true); } if (dataset === 'prefs') { throw new Error('Cannot set prefs while importing'); } } } }); } type Message = { column: string; dataset: string; old?: unknown; row: string; timestamp: number; value: unknown; }; export const applyMessages = sequential(async (messages: Message[]) => { if (checkSyncingMode('import')) { return applyMessagesForImport(messages); } else if (checkSyncingMode('enabled')) { // Compare the messages with the existing crdt. This filters out // already applied messages and determines if a message is old or // not. An "old" message doesn't need to be applied, but it still // needs to be put into the merkle trie to maintain the hash. messages = await compareMessages(messages); } messages = [...messages].sort((m1, m2) => { let t1 = m1.timestamp ? m1.timestamp.toString() : ''; let t2 = m2.timestamp ? m2.timestamp.toString() : ''; if (t1 < t2) { return -1; } else if (t1 > t2) { return 1; } return 0; }); let idsPerTable = {}; messages.forEach(msg => { if (msg.dataset === 'prefs') { return; } if (idsPerTable[msg.dataset] == null) { idsPerTable[msg.dataset] = []; } idsPerTable[msg.dataset].push(msg.row); }); async function fetchData() { let data = new Map(); for (let table of Object.keys(idsPerTable)) { const rows = await fetchAll(table, idsPerTable[table]); for (let i = 0; i < rows.length; i++) { let row = rows[i]; setIn(data, [table, row.id], row); } } return data; } let prefsToSet: Record<string, unknown> = {}; let oldData = await fetchData(); undo.appendMessages(messages, oldData); // It's important to not mutate the clock while processing the // messages. We only want to mutate it if the transaction succeeds. // The merkle variable will be updated while applying the messages and // we'll apply it afterwards. let clock; let currentMerkle; if (checkSyncingMode('enabled')) { clock = getClock(); currentMerkle = clock.merkle; } if (sheet.get()) { sheet.get().startCacheBarrier(); } // Now that we have all of the data, go through and apply the // messages carefully. This transaction is **crucial**: it // guarantees that everything is atomically committed to the // database, and if any part of it fails everything aborts and // nothing is changed. This is critical to maintain consistency. We // also avoid any side effects to in-memory objects, and apply them // after this succeeds. db.transaction(() => { let added = new Set(); for (let i = 0; i < messages.length; i++) { let msg = messages[i]; let { dataset, row, column, timestamp, value } = msg; if (!msg.old) { apply(msg, getIn(oldData, [dataset, row]) || added.has(dataset + row)); if (dataset === 'prefs') { prefsToSet[row] = value; } else { // Keep track of which items have been added it in this sync // so it knows whether they already exist in the db or not. We // ignore any changes to the spreadsheet. added.add(dataset + row); } } if (checkSyncingMode('enabled')) { db.runQuery( db.cache(`INSERT INTO messages_crdt (timestamp, dataset, row, column, value) VALUES (?, ?, ?, ?, ?)`), [timestamp.toString(), dataset, row, column, serializeValue(value)], ); currentMerkle = merkle.insert(currentMerkle, msg.timestamp); } } if (checkSyncingMode('enabled')) { currentMerkle = merkle.prune(currentMerkle); // Save the clock in the db first (queries might throw // exceptions) db.runQuery( db.cache( 'INSERT OR REPLACE INTO messages_clock (id, clock) VALUES (1, ?)', ), [serializeClock({ ...clock, merkle: currentMerkle })], ); } }); if (checkSyncingMode('enabled')) { // The transaction succeeded, so we can update in-memory objects // now. Update the in-memory clock. clock.merkle = currentMerkle; } // Save any synced prefs if (Object.keys(prefsToSet).length > 0) { prefs.savePrefs(prefsToSet, { avoidSync: true }); if (prefsToSet.budgetType) { setBudgetType(prefsToSet.budgetType); } connection.send('prefs-updated'); } let newData = await fetchData(); // In testing, sometimes the spreadsheet isn't loaded, and that's ok if (sheet.get()) { // Need to clean up these APIs and make them consistent sheet.startTransaction(); triggerBudgetChanges(oldData, newData); sheet.get().triggerDatabaseChanges(oldData, newData); sheet.endTransaction(); // Allow the cache to be used in the future. At this point it's guaranteed // to be up-to-date because we are done mutating any other data sheet.get().endCacheBarrier(); } _syncListeners.forEach(func => func(oldData, newData)); let tables = getTablesFromMessages(messages.filter(msg => !msg.old)); app.events.emit('sync', { type: 'applied', tables, data: newData, prevData: oldData, }); return messages; }); export function receiveMessages(messages: Message[]) { messages.forEach(msg => { Timestamp.recv(msg.timestamp); }); return runMutator(() => applyMessages(messages)); } async function _sendMessages(messages) { try { await applyMessages(messages); } catch (e) { if (e instanceof SyncError) { if (e.reason === 'invalid-schema') { // We know this message came from a local modification, and it // couldn't apply, which doesn't make any sense. Must be a bug // in the code. Send a specific error type for it for a custom // message. app.events.emit('sync', { type: 'error', subtype: 'apply-failure', meta: e.meta, }); } else { app.events.emit('sync', { type: 'error', meta: e.meta }); } } throw e; } await scheduleFullSync(); } let IS_BATCHING = false; let _BATCHED = []; export async function batchMessages(func) { if (IS_BATCHING) { await func(); return; } IS_BATCHING = true; let batched = []; try { await func(); // TODO: if it fails, it shouldn't apply them? } finally { IS_BATCHING = false; batched = _BATCHED; _BATCHED = []; } if (batched.length > 0) { await _sendMessages(batched); } } export async function sendMessages(messages) { if (IS_BATCHING) { _BATCHED = _BATCHED.concat(messages); } else { return _sendMessages(messages); } } export function getMessagesSince(since) { return db.runQuery( 'SELECT timestamp, dataset, row, column, value FROM messages_crdt WHERE timestamp > ?', [since], true, ); } export async function syncAndReceiveMessages(messages, since) { let localMessages = await getMessagesSince(since); await receiveMessages( messages.map(msg => ({ ...msg, value: deserializeValue(msg.value), timestamp: Timestamp.parse(msg.timestamp), })), ); return localMessages; } export function clearFullSyncTimeout() { if (syncTimeout) { clearTimeout(syncTimeout); syncTimeout = null; } } let syncTimeout = null; export function scheduleFullSync() { clearFullSyncTimeout(); if (checkSyncingMode('enabled') && !checkSyncingMode('offline')) { if (process.env.NODE_ENV === 'test') { return fullSync().then(res => { if (res.error) { throw res.error; } return res; }); } else { syncTimeout = setTimeout(fullSync, FULL_SYNC_DELAY); } } } function getTablesFromMessages(messages) { return messages.reduce((acc, message) => { let dataset = message.dataset === 'schedules_next_date' ? 'schedules' : message.dataset; if (!acc.includes(dataset)) { acc.push(dataset); } return acc; }, []); } // This is different than `fullSync` because it waits for the // spreadsheet to finish any processing. This is useful if we want to // perform a full sync and wait for everything to finish, usually if // you're doing an initial sync before working with a file. export async function initialFullSync() { let result = await fullSync(); if (!result.error) { // Make sure to wait for anything in the spreadsheet to process await sheet.waitOnSpreadsheet(); } } export const fullSync = once(async function () { app.events.emit('sync', { type: 'start' }); let messages; try { messages = await _fullSync(null, 0, null); } catch (e) { console.log(e); if (e instanceof SyncError) { if (e.reason === 'out-of-sync') { captureException(e); app.events.emit('sync', { type: 'error', subtype: 'out-of-sync', }); } else if (e.reason === 'invalid-schema') { app.events.emit('sync', { type: 'error', subtype: 'invalid-schema', }); } else if ( e.reason === 'decrypt-failure' || e.reason === 'encrypt-failure' ) { app.events.emit('sync', { type: 'error', subtype: e.reason, meta: e.meta, }); } else if (e.reason === 'beta-version') { app.events.emit('sync', { type: 'error', subtype: e.reason, }); } else { app.events.emit('sync', { type: 'error' }); } } else if (e instanceof PostError) { console.log(e); if (e.reason === 'unauthorized') { app.events.emit('sync', { type: 'unauthorized' }); // Set the user into read-only mode asyncStorage.setItem('readOnly', 'true'); } else if (e.reason === 'network-failure') { app.events.emit('sync', { type: 'error', subtype: 'network' }); } else { app.events.emit('sync', { type: 'error', subtype: e.reason }); } } else { captureException(e); // TODO: Send the message to the client and allow them to expand & view it app.events.emit('sync', { type: 'error' }); } return { error: { message: e.message, reason: e.reason, meta: e.meta } }; } let tables = getTablesFromMessages(messages); app.events.emit('sync', { type: 'success', tables, syncDisabled: checkSyncingMode('disabled'), }); return { messages }; }); async function _fullSync(sinceTimestamp, count, prevDiffTime) { let { cloudFileId, groupId, lastSyncedTimestamp } = prefs.getPrefs() || {}; clearFullSyncTimeout(); if (checkSyncingMode('disabled') || checkSyncingMode('offline')) { return []; } // Snapshot the point at which we are currently syncing let currentTime = getClock().timestamp.toString(); let since = sinceTimestamp || lastSyncedTimestamp || // Default to 5 minutes ago new Timestamp(Date.now() - 5 * 60 * 1000, 0, '0').toString(); let messages = getMessagesSince(since); let userToken = await asyncStorage.getItem('user-token'); logger.info( 'Syncing since', since, messages.length, '(attempt: ' + count + ')', ); let buffer = await encoder.encode(groupId, cloudFileId, since, messages); // TODO: There a limit on how many messages we can send because of // the payload size. Right now it's at 20MB on the server. We should // check the worst case here and make multiple requests if it's // really large. let resBuffer = await postBinary(getServer().SYNC_SERVER + '/sync', buffer, { 'X-ACTUAL-TOKEN': userToken, }); // Abort if the file is either no longer loaded, the group id has // changed because of a sync reset if (!prefs.getPrefs() || prefs.getPrefs().groupId !== groupId) { return []; } let res = await encoder.decode(resBuffer); logger.info('Got messages from server', res.messages.length); let localTimeChanged = getClock().timestamp.toString() !== currentTime; // Apply the new messages let receivedMessages: unknown[] = []; if (res.messages.length > 0) { receivedMessages = await receiveMessages( res.messages.map(msg => ({ ...msg, value: deserializeValue(msg.value), timestamp: Timestamp.parse(msg.timestamp), })), ); } let diffTime = merkle.diff(res.merkle, getClock().merkle); if (diffTime !== null) { // This is a bit wonky, but we loop until we are in sync with the // server. While syncing, either the client or server could change // out from under us, so it might take a couple passes to // completely sync up. This is a check that stops the loop in case // we are corrupted and can't sync up. We try 10 times if we keep // getting the same diff time, and add a upper limit of 300 no // matter what (just to stop this from ever being an infinite // loop). // // It's slightly possible for the user to add more messages while we // are in `receiveMessages`, but `localTimeChanged` would still be // false. In that case, we don't reset the counter but it should be // very unlikely that this happens enough to hit the loop limit. if ((count >= 10 && diffTime === prevDiffTime) || count >= 100) { logger.info('SENT -------'); logger.info(JSON.stringify(messages)); logger.info('RECEIVED -------'); logger.info(JSON.stringify(res.messages)); let rebuiltMerkle = rebuildMerkleHash(); console.log( count, 'messages:', messages.length, messages.length > 0 ? messages[0] : null, 'res.messages:', res.messages.length, res.messages.length > 0 ? res.messages[0] : null, 'clientId', getClock().timestamp.node(), 'groupId', groupId, 'diffTime:', diffTime, diffTime === prevDiffTime, 'local clock:', getClock().timestamp.toString(), getClock().merkle.hash, 'rebuilt hash:', rebuiltMerkle.numMessages, rebuiltMerkle.trie.hash, 'server hash:', res.merkle.hash, 'localTimeChanged:', localTimeChanged, ); if (rebuiltMerkle.trie.hash === res.merkle.hash) { // Rebuilding the merkle worked... but why? let clocks = await db.all('SELECT * FROM messages_clock'); if (clocks.length !== 1) { console.log('Bad number of clocks:', clocks.length); } let hash = deserializeClock(clocks[0]).merkle.hash; console.log('Merkle hash in db:', hash); } throw new SyncError('out-of-sync'); } receivedMessages = receivedMessages.concat( await _fullSync( new Timestamp(diffTime, 0, '0').toString(), // If something local changed while we were syncing, always // reset, token the counter. We never want to think syncing failed // because we tried to syncing many times and couldn't sync, // but it was because the user kept changing stuff in the // middle of syncing. localTimeChanged ? 0 : count + 1, diffTime, ), ); } else { // All synced up, store the current time as a simple optimization // for the next sync await prefs.savePrefs({ lastSyncedTimestamp: getClock().timestamp.toString(), }); } return receivedMessages; }