Skip to content
Snippets Groups Projects
index.ts 22.03 KiB
// @ts-strict-ignore
import {
  serializeClock,
  deserializeClock,
  getClock,
  Timestamp,
  merkle,
} from '@actual-app/crdt';

import { captureException } from '../../platform/exceptions';
import * as asyncStorage from '../../platform/server/asyncStorage';
import * as connection from '../../platform/server/connection';
import { logger } from '../../platform/server/log';
import { sequential, once } from '../../shared/async';
import { setIn, getIn } from '../../shared/util';
import { LocalPrefs } from '../../types/prefs';
import { triggerBudgetChanges, setType as setBudgetType } from '../budget/base';
import * as db from '../db';
import { PostError, SyncError } from '../errors';
import { app } from '../main-app';
import { runMutator } from '../mutators';
import { postBinary } from '../post';
import * as prefs from '../prefs';
import { getServer } from '../server-config';
import * as sheet from '../sheet';
import * as undo from '../undo';

import * as encoder from './encoder';
import { rebuildMerkleHash } from './repair';
import { isError } from './utils';

export { makeTestMessage } from './make-test-message';
export { resetSync } from './reset';
export { repairSync } from './repair';

const FULL_SYNC_DELAY = 1000;
let SYNCING_MODE = 'enabled';
type SyncingMode = 'enabled' | 'offline' | 'disabled' | 'import';

export function setSyncingMode(mode: SyncingMode) {
  const prevMode = SYNCING_MODE;
  switch (mode) {
    case 'enabled':
      SYNCING_MODE = 'enabled';
      break;
    case 'offline':
      SYNCING_MODE = 'offline';
      break;
    case 'disabled':
      SYNCING_MODE = 'disabled';
      break;
    case 'import':
      SYNCING_MODE = 'import';
      break;
    default:
      throw new Error('setSyncingMode: invalid mode: ' + mode);
  }
  return prevMode;
}

export function checkSyncingMode(mode: SyncingMode): boolean {
  switch (mode) {
    case 'enabled':
      return SYNCING_MODE === 'enabled' || SYNCING_MODE === 'offline';
    case 'disabled':
      return SYNCING_MODE === 'disabled' || SYNCING_MODE === 'import';
    case 'offline':
      return SYNCING_MODE === 'offline';
    case 'import':
      return SYNCING_MODE === 'import';
    default:
      throw new Error('checkSyncingMode: invalid mode: ' + mode);
  }
}

function apply(msg: Message, prev?: boolean) {
  const { dataset, row, column, value } = msg;

  if (dataset === 'prefs') {
    // Do nothing, it doesn't exist in the db
  } else {
    let query;
    try {
      if (prev) {
        query = {
          sql: `UPDATE ${dataset} SET ${column} = ? WHERE id = ?`,
          params: [value, row],
        };
      } else {
        query = {
          sql: `INSERT INTO ${dataset} (id, ${column}) VALUES (?, ?)`,
          params: [row, value],
        };
      }

      db.runQuery(db.cache(query.sql), query.params);
    } catch (error) {
      throw new SyncError('invalid-schema', {
        error: { message: error.message, stack: error.stack },
        query,
      });
    }
  }
}

// TODO: convert to `whereIn`
async function fetchAll(table, ids) {
  let results = [];

  // was 500, but that caused a stack overflow in Safari
  const batchSize = 100;

  for (let i = 0; i < ids.length; i += batchSize) {
    const partIds = ids.slice(i, i + batchSize);
    let sql;
    let column = `${table}.id`;

    // We have to provide *mapped* data so the spreadsheet works. The functions
    // which trigger budget changes based on data changes assumes data has been
    // mapped. The only mapped data that the budget is concerned about is
    // categories. This is kind of annoying, but we manually map it here
    if (table === 'transactions') {
      sql = `
        SELECT t.*, c.transferId AS category
        FROM transactions t
        LEFT JOIN category_mapping c ON c.id = t.category
      `;
      column = 't.id';
    } else {
      sql = `SELECT * FROM ${table}`;
    }

    sql += ` WHERE `;
    sql += partIds.map(() => `${column} = ?`).join(' OR ');

    try {
      const rows = await db.runQuery(sql, partIds, true);
      results = results.concat(rows);
    } catch (error) {
      throw new SyncError('invalid-schema', {
        error: {
          message: error.message,
          stack: error.stack,
        },
        query: { sql, params: partIds },
      });
    }
  }

  return results;
}

export function serializeValue(value: string | number | null): string {
  if (value === null) {
    return '0:';
  } else if (typeof value === 'number') {
    return 'N:' + value;
  } else if (typeof value === 'string') {
    return 'S:' + value;
  }

  throw new Error('Unserializable value type: ' + JSON.stringify(value));
}

export function deserializeValue(value: string): string | number | null {
  const type = value[0];
  switch (type) {
    case '0':
      return null;
    case 'N':
      return parseFloat(value.slice(2));
    case 'S':
      return value.slice(2);
    default:
  }

  throw new Error('Invalid type key for value: ' + value);
}

// TODO make this type stricter.
type DataMap = Map<string, unknown>;
type SyncListener = (oldData: DataMap, newData: DataMap) => unknown;
let _syncListeners: SyncListener[] = [];

export function addSyncListener(func: SyncListener) {
  _syncListeners.push(func);

  return () => {
    _syncListeners = _syncListeners.filter(f => f !== func);
  };
}

async function compareMessages(messages: Message[]): Promise<Message[]> {
  const newMessages = [];

  for (let i = 0; i < messages.length; i++) {
    const message = messages[i];
    const { dataset, row, column, timestamp } = message;
    const timestampStr = timestamp.toString();

    const res = db.runQuery(
      db.cache(
        'SELECT timestamp FROM messages_crdt WHERE dataset = ? AND row = ? AND column = ? AND timestamp >= ?',
      ),
      [dataset, row, column, timestampStr],
      true,
    );

    // Returned message is any one that is "later" than this message,
    // meaning if the result exists this message is an old one
    if (res.length === 0) {
      newMessages.push(message);
    } else if (res[0].timestamp !== timestampStr) {
      newMessages.push({ ...message, old: true });
    }
  }

  return newMessages;
}

// This is the fast path `apply` function when in "import" mode.
// There's no need to run through the whole sync system when
// importing, but **there is a caveat**: because we don't run sync
// listeners importers should not rely on any functions that use any
// projected state (like rules). We can't fire those because they
// depend on having both old and new data which we don't quere here
function applyMessagesForImport(messages: Message[]): void {
  db.transaction(() => {
    for (let i = 0; i < messages.length; i++) {
      const msg = messages[i];
      const { dataset } = msg;

      if (!msg.old) {
        try {
          apply(msg);
        } catch (e) {
          apply(msg, true);
        }

        if (dataset === 'prefs') {
          throw new Error('Cannot set prefs while importing');
        }
      }
    }
  });
}

export type Message = {
  column: string;
  dataset: string;
  old?: unknown;
  row: string;
  timestamp: Timestamp;
  value: string | number | null;
};

export const applyMessages = sequential(async (messages: Message[]) => {
  if (checkSyncingMode('import')) {
    applyMessagesForImport(messages);
    return undefined;
  } else if (checkSyncingMode('enabled')) {
    // Compare the messages with the existing crdt. This filters out
    // already applied messages and determines if a message is old or
    // not. An "old" message doesn't need to be applied, but it still
    // needs to be put into the merkle trie to maintain the hash.
    messages = await compareMessages(messages);
  }

  messages = [...messages].sort((m1, m2) => {
    const t1 = m1.timestamp ? m1.timestamp.toString() : '';
    const t2 = m2.timestamp ? m2.timestamp.toString() : '';
    if (t1 < t2) {
      return -1;
    } else if (t1 > t2) {
      return 1;
    }
    return 0;
  });

  const idsPerTable = {};
  messages.forEach(msg => {
    if (msg.dataset === 'prefs') {
      return;
    }

    if (idsPerTable[msg.dataset] == null) {
      idsPerTable[msg.dataset] = [];
    }
    idsPerTable[msg.dataset].push(msg.row);
  });

  async function fetchData(): Promise<DataMap> {
    const data = new Map();

    for (const table of Object.keys(idsPerTable)) {
      const rows = await fetchAll(table, idsPerTable[table]);

      for (let i = 0; i < rows.length; i++) {
        const row = rows[i];
        setIn(data, [table, row.id], row);
      }
    }

    return data;
  }

  const prefsToSet: LocalPrefs = {};
  const oldData = await fetchData();

  undo.appendMessages(messages, oldData);

  // It's important to not mutate the clock while processing the
  // messages. We only want to mutate it if the transaction succeeds.
  // The merkle variable will be updated while applying the messages and
  // we'll apply it afterwards.
  let clock;
  let currentMerkle;
  if (checkSyncingMode('enabled')) {
    clock = getClock();
    currentMerkle = clock.merkle;
  }

  if (sheet.get()) {
    sheet.get().startCacheBarrier();
  }

  // Now that we have all of the data, go through and apply the
  // messages carefully. This transaction is **crucial**: it
  // guarantees that everything is atomically committed to the
  // database, and if any part of it fails everything aborts and
  // nothing is changed. This is critical to maintain consistency. We
  // also avoid any side effects to in-memory objects, and apply them
  // after this succeeds.
  db.transaction(() => {
    const added = new Set();

    for (const msg of messages) {
      const { dataset, row, column, timestamp, value } = msg;

      if (!msg.old) {
        apply(msg, getIn(oldData, [dataset, row]) || added.has(dataset + row));

        if (dataset === 'prefs') {
          prefsToSet[row] = value;
        } else {
          // Keep track of which items have been added it in this sync
          // so it knows whether they already exist in the db or not. We
          // ignore any changes to the spreadsheet.
          added.add(dataset + row);
        }
      }

      if (checkSyncingMode('enabled')) {
        db.runQuery(
          db.cache(`INSERT INTO messages_crdt (timestamp, dataset, row, column, value)
           VALUES (?, ?, ?, ?, ?)`),
          [timestamp.toString(), dataset, row, column, serializeValue(value)],
        );

        currentMerkle = merkle.insert(currentMerkle, timestamp);
      }

      // Special treatment for some synced prefs
      if (dataset === 'preferences' && row === 'budgetType') {
        setBudgetType(value);
      }
    }

    if (checkSyncingMode('enabled')) {
      currentMerkle = merkle.prune(currentMerkle);

      // Save the clock in the db first (queries might throw
      // exceptions)
      db.runQuery(
        db.cache(
          'INSERT OR REPLACE INTO messages_clock (id, clock) VALUES (1, ?)',
        ),
        [serializeClock({ ...clock, merkle: currentMerkle })],
      );
    }
  });

  if (checkSyncingMode('enabled')) {
    // The transaction succeeded, so we can update in-memory objects
    // now. Update the in-memory clock.
    clock.merkle = currentMerkle;
  }

  // Save any synced prefs
  if (Object.keys(prefsToSet).length > 0) {
    prefs.savePrefs(prefsToSet, { avoidSync: true });
    connection.send('prefs-updated');
  }

  const newData = await fetchData();

  // In testing, sometimes the spreadsheet isn't loaded, and that's ok
  if (sheet.get()) {
    // Need to clean up these APIs and make them consistent
    sheet.startTransaction();
    triggerBudgetChanges(oldData, newData);
    sheet.get().triggerDatabaseChanges(oldData, newData);
    sheet.endTransaction();

    // Allow the cache to be used in the future. At this point it's guaranteed
    // to be up-to-date because we are done mutating any other data
    sheet.get().endCacheBarrier();
  }

  _syncListeners.forEach(func => func(oldData, newData));

  const tables = getTablesFromMessages(messages.filter(msg => !msg.old));
  app.events.emit('sync', {
    type: 'applied',
    tables,
    data: newData,
    prevData: oldData,
  });

  return messages;
});

export function receiveMessages(messages: Message[]): Promise<Message[]> {
  messages.forEach(msg => {
    Timestamp.recv(msg.timestamp);
  });

  return runMutator(() => applyMessages(messages));
}

async function _sendMessages(messages: Message[]): Promise<void> {
  try {
    await applyMessages(messages);
  } catch (e) {
    if (e instanceof SyncError) {
      if (e.reason === 'invalid-schema') {
        // We know this message came from a local modification, and it
        // couldn't apply, which doesn't make any sense. Must be a bug
        // in the code. Send a specific error type for it for a custom
        // message.
        app.events.emit('sync', {
          type: 'error',
          subtype: 'apply-failure',
          meta: e.meta,
        });
      } else {
        app.events.emit('sync', { type: 'error', meta: e.meta });
      }
    }

    throw e;
  }

  await scheduleFullSync();
}

let IS_BATCHING = false;
let _BATCHED: Message[] = [];
export async function batchMessages(func: () => Promise<void>): Promise<void> {
  if (IS_BATCHING) {
    await func();
    return;
  }

  IS_BATCHING = true;
  let batched: Message[] = [];

  try {
    await func();
    // TODO: if it fails, it shouldn't apply them?
  } finally {
    IS_BATCHING = false;
    batched = _BATCHED;
    _BATCHED = [];
  }

  if (batched.length > 0) {
    await _sendMessages(batched);
  }
}

export async function sendMessages(messages: Message[]) {
  if (IS_BATCHING) {
    _BATCHED = _BATCHED.concat(messages);
  } else {
    return _sendMessages(messages);
  }
}

export function getMessagesSince(since: string): Message[] {
  return db.runQuery(
    'SELECT timestamp, dataset, row, column, value FROM messages_crdt WHERE timestamp > ?',
    [since],
    true,
  );
}

export function clearFullSyncTimeout(): void {
  if (syncTimeout) {
    clearTimeout(syncTimeout);
    syncTimeout = null;
  }
}

let syncTimeout = null;
export function scheduleFullSync(): Promise<
  { messages: Message[] } | { error: unknown }
> {
  clearFullSyncTimeout();

  if (checkSyncingMode('enabled') && !checkSyncingMode('offline')) {
    if (process.env.NODE_ENV === 'test') {
      return fullSync().then(res => {
        if (isError(res)) {
          throw res.error;
        }
        return res;
      });
    } else {
      syncTimeout = setTimeout(fullSync, FULL_SYNC_DELAY);
    }
  }
}

function getTablesFromMessages(messages: Message[]): string[] {
  return messages.reduce((acc, message) => {
    const dataset =
      message.dataset === 'schedules_next_date' ? 'schedules' : message.dataset;

    if (!acc.includes(dataset)) {
      acc.push(dataset);
    }
    return acc;
  }, []);
}

// This is different than `fullSync` because it waits for the
// spreadsheet to finish any processing. This is useful if we want to
// perform a full sync and wait for everything to finish, usually if
// you're doing an initial sync before working with a file.
export async function initialFullSync(): Promise<{
  error?: { message: string; reason: string; meta: unknown };
}> {
  const result = await fullSync();
  if (isError(result)) {
    // Make sure to wait for anything in the spreadsheet to process
    await sheet.waitOnSpreadsheet();
    return result;
  }
  return {};
}

export const fullSync = once(async function (): Promise<
  | { messages: Message[] }
  | { error: { message: string; reason: string; meta: unknown } }
> {
  app.events.emit('sync', { type: 'start' });
  let messages;

  try {
    messages = await _fullSync(null, 0, null);
  } catch (e) {
    console.log(e);

    if (e instanceof SyncError) {
      if (e.reason === 'out-of-sync') {
        captureException(e);

        app.events.emit('sync', {
          type: 'error',
          subtype: 'out-of-sync',
          meta: e.meta,
        });
      } else if (e.reason === 'invalid-schema') {
        app.events.emit('sync', {
          type: 'error',
          subtype: 'invalid-schema',
          meta: e.meta,
        });
      } else if (
        e.reason === 'decrypt-failure' ||
        e.reason === 'encrypt-failure'
      ) {
        app.events.emit('sync', {
          type: 'error',
          subtype: e.reason,
          meta: e.meta,
        });
      } else {
        app.events.emit('sync', { type: 'error', meta: e.meta });
      }
    } else if (e instanceof PostError) {
      console.log(e);
      if (e.reason === 'unauthorized') {
        app.events.emit('sync', { type: 'unauthorized' });

        // Set the user into read-only mode
        asyncStorage.setItem('readOnly', 'true');
      } else if (e.reason === 'network-failure') {
        app.events.emit('sync', { type: 'error', subtype: 'network' });
      } else {
        app.events.emit('sync', { type: 'error', subtype: e.reason });
      }
    } else {
      captureException(e);
      // TODO: Send the message to the client and allow them to expand & view it
      app.events.emit('sync', { type: 'error' });
    }

    return { error: { message: e.message, reason: e.reason, meta: e.meta } };
  }

  const tables = getTablesFromMessages(messages);

  app.events.emit('sync', {
    type: 'success',
    tables,
    syncDisabled: checkSyncingMode('disabled'),
  });
  return { messages };
});

async function _fullSync(
  sinceTimestamp: string,
  count: number,
  prevDiffTime: number,
): Promise<Message[]> {
  const { cloudFileId, groupId, lastSyncedTimestamp } = prefs.getPrefs() || {};

  clearFullSyncTimeout();

  if (checkSyncingMode('disabled') || checkSyncingMode('offline')) {
    return [];
  }

  // Snapshot the point at which we are currently syncing
  const currentTime = getClock().timestamp.toString();

  const since =
    sinceTimestamp ||
    lastSyncedTimestamp ||
    // Default to 5 minutes ago
    new Timestamp(Date.now() - 5 * 60 * 1000, 0, '0').toString();

  const messages = getMessagesSince(since);

  const userToken = await asyncStorage.getItem('user-token');

  logger.info(
    'Syncing since',
    since,
    messages.length,
    '(attempt: ' + count + ')',
  );

  const buffer = await encoder.encode(groupId, cloudFileId, since, messages);

  // TODO: There a limit on how many messages we can send because of
  // the payload size. Right now it's at 20MB on the server. We should
  // check the worst case here and make multiple requests if it's
  // really large.
  const resBuffer = await postBinary(
    getServer().SYNC_SERVER + '/sync',
    buffer,
    {
      'X-ACTUAL-TOKEN': userToken,
    },
  );

  // Abort if the file is either no longer loaded, the group id has
  // changed because of a sync reset
  if (!prefs.getPrefs() || prefs.getPrefs().groupId !== groupId) {
    return [];
  }

  const res = await encoder.decode(resBuffer);

  logger.info('Got messages from server', res.messages.length);

  const localTimeChanged = getClock().timestamp.toString() !== currentTime;

  // Apply the new messages
  let receivedMessages: Message[] = [];
  if (res.messages.length > 0) {
    receivedMessages = await receiveMessages(
      res.messages.map(msg => ({
        ...msg,
        value: deserializeValue(msg.value as string),
      })),
    );
  }

  const diffTime = merkle.diff(res.merkle, getClock().merkle);

  if (diffTime !== null) {
    // This is a bit wonky, but we loop until we are in sync with the
    // server. While syncing, either the client or server could change
    // out from under us, so it might take a couple passes to
    // completely sync up. This is a check that stops the loop in case
    // we are corrupted and can't sync up. We try 10 times if we keep
    // getting the same diff time, and add a upper limit of 300 no
    // matter what (just to stop this from ever being an infinite
    // loop).
    //
    // It's slightly possible for the user to add more messages while we
    // are in `receiveMessages`, but `localTimeChanged` would still be
    // false. In that case, we don't reset the counter but it should be
    // very unlikely that this happens enough to hit the loop limit.

    if ((count >= 10 && diffTime === prevDiffTime) || count >= 100) {
      logger.info('SENT -------');
      logger.info(JSON.stringify(messages));
      logger.info('RECEIVED -------');
      logger.info(JSON.stringify(res.messages));

      const rebuiltMerkle = rebuildMerkleHash();

      console.log(
        count,
        'messages:',
        messages.length,
        messages.length > 0 ? messages[0] : null,
        'res.messages:',
        res.messages.length,
        res.messages.length > 0 ? res.messages[0] : null,
        'clientId',
        getClock().timestamp.node(),
        'groupId',
        groupId,
        'diffTime:',
        diffTime,
        diffTime === prevDiffTime,
        'local clock:',
        getClock().timestamp.toString(),
        getClock().merkle.hash,
        'rebuilt hash:',
        rebuiltMerkle.numMessages,
        rebuiltMerkle.trie.hash,
        'server hash:',
        res.merkle.hash,
        'localTimeChanged:',
        localTimeChanged,
      );

      if (rebuiltMerkle.trie.hash === res.merkle.hash) {
        // Rebuilding the merkle worked... but why?
        const clocks = await db.all('SELECT * FROM messages_clock');
        if (clocks.length !== 1) {
          console.log('Bad number of clocks:', clocks.length);
        }
        const hash = deserializeClock(clocks[0]).merkle.hash;
        console.log('Merkle hash in db:', hash);
      }

      throw new SyncError('out-of-sync');
    }

    receivedMessages = receivedMessages.concat(
      await _fullSync(
        new Timestamp(diffTime, 0, '0').toString(),
        // If something local changed while we were syncing, always
        // reset, token the counter. We never want to think syncing failed
        // because we tried to syncing many times and couldn't sync,
        // but it was because the user kept changing stuff in the
        // middle of syncing.
        localTimeChanged ? 0 : count + 1,
        diffTime,
      ),
    );
  } else {
    // All synced up, store the current time as a simple optimization for the next sync
    const requiresUpdate =
      getClock().timestamp.toString() !== lastSyncedTimestamp;

    if (requiresUpdate) {
      await prefs.savePrefs({
        lastSyncedTimestamp: getClock().timestamp.toString(),
      });
    }
  }

  return receivedMessages;
}