Skip to content

Update user logging sequence and make collections deal with reactivity better #111

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 18 commits into from
Mar 17, 2023
Merged
5,879 changes: 891 additions & 4,988 deletions package-lock.json

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -29,7 +29,6 @@
"dependencies": {
"@meteorrn/minimongo": "1.0.1",
"ejson": "2.2.3",
"prop-types": "^15.5.10",
"wolfy87-eventemitter": "4.3.0"
},
"devDependencies": {
@@ -51,14 +50,14 @@
"sinon": "12.0.1"
},
"optionalDependencies": {
"@react-native-async-storage/async-storage": ">=1.8.1"
"@react-native-community/async-storage": ">=1.12.1"
},
"peerDependencies": {
"@react-native-community/netinfo": "*",
"react": "*",
"react-native": ">= 0.60.0"
"react-native": "*"
},
"engines": {
"node": ">= 10.x"
"node": ">= 14.x"
}
}
106 changes: 97 additions & 9 deletions src/Collection.js
Original file line number Diff line number Diff line change
@@ -6,31 +6,54 @@ import call from './Call';
import { hasOwn, isPlainObject } from '../lib/utils.js';

const observers = {};

export const runObservers = (type, collection, newDocument, oldDocument) => {
const observersByComp = {};
/**
* Get the list of callbacks for changes on a collection
* @param {string} type - Type of change happening.
* @param {string} collection - Collection it has happened on
* @param {string} newDocument - New value of item in the colleciton
*/
export function getObservers(type, collection, newDocument) {
let observersRet = [];
if (observers[collection]) {
observers[collection].forEach(({ cursor, callbacks }) => {
if (callbacks[type]) {
if (type === 'removed') {
callbacks['removed'](newDocument);
observersRet.push(callbacks['removed']);
} else if (
Data.db[collection].findOne({
$and: [{ _id: newDocument._id }, cursor._selector],
})
) {
try {
callbacks[type](newDocument, oldDocument);
observersRet.push(callbacks[type]);
} catch (e) {
// TODO we should optionally allow an onError callback
console.error('Error in observe callback', e);
console.error('Error in observe callback old', e);
}
} else {
// TODO what to do here?
}
}
});
}
};
// Find the observers related to the specific query
if (observersByComp[collection]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add one or more lines of comments:

// what this block does
// why it's needed

let keys = Object.keys(observersByComp[collection]);
for (let i = 0; i < keys.length; i++) {
observersByComp[collection][keys[i]].callbacks.forEach(
({ cursor, callback }) => {
let findRes = Data.db[collection].findOne({
$and: [{ _id: newDocument?._id }, cursor._selector],
});
if (findRes) {
observersRet.push(callback);
}
}
);
}
}
return observersRet;
}

const _registerObserver = (collection, cursor, callbacks) => {
observers[collection] = observers[collection] || [];
@@ -103,8 +126,44 @@ export class Collection {
} else {
docs = this._collection.find(selector, options);
}
result = new Cursor(
this,
docs,
typeof selector == 'string' ? { _id: selector } : selector
);

// If this is being called within a use tracker
// make the tracker computation to say if this
// collection is changed it needs to be re-run
if (Tracker.active && Tracker.currentComputation) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// what this block does
// why it's needed

let id = Tracker.currentComputation._id;
observersByComp[this._name] = observersByComp[this._name] || {};
if (!observersByComp[this._name][id]) {
let item = {
computation: Tracker.currentComputation,
callbacks: [],
};
observersByComp[this._name][id] = item;
}
let item = observersByComp[this._name][id];

item.callbacks.push({
cursor: result,
callback: (newVal, old) => {
if (old && EJSON.equals(newVal, old)) {
return;
}

item.computation.invalidate();
},
});

result = new Cursor(this, docs, selector);
Tracker.onInvalidate(() => {
if (observersByComp[this._name][id]) {
delete observersByComp[this._name][id];
}
});
}

return result;
}
@@ -153,6 +212,15 @@ export class Collection {
});
});
}
// Notify relevant observers that the item has been updated with its new value
let observers = getObservers('added', this._collection.name, item);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// what this block does
// why it's needed

observers.forEach((callback) => {
try {
callback(item, undefined);
} catch (e) {
console.error('Error in observe callback', e);
}
});

return id;
}
@@ -161,7 +229,7 @@ export class Collection {
if (typeof options == 'function') {
callback = options;
}

let old = this._collection.get(id);
if (!this._collection.get(id))
return callback({
error: 409,
@@ -184,6 +252,16 @@ export class Collection {
});
});
}
let newItem = this._collection.findOne({ _id: id });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// what this block does
// why it's needed

// Notify relevant observers that the item has been updated with its new value
let observers = getObservers('changed', this._collection.name, newItem);
observers.forEach((callback) => {
try {
callback(newItem, old);
} catch (e) {
console.error('Error in observe callback', e);
}
});
}

remove(id, callback = () => {}) {
@@ -203,6 +281,16 @@ export class Collection {
});
});
}

// Load the observers for removing the element
let observers = getObservers('removed', this._collection.name, element);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// what this block does
// why it's needed

observers.forEach((callback) => {
try {
callback(element);
} catch (e) {
console.error('Error in observe callback', e);
}
});
} else {
callback(`No document with _id : ${id}`);
}
14 changes: 9 additions & 5 deletions src/Data.js
Original file line number Diff line number Diff line change
@@ -71,11 +71,15 @@ export default {
);
},
notify(eventName) {
this._cbs.map((cb) => {
if (cb.eventName == eventName && typeof cb.callback == 'function') {
cb.callback();
}
});
// Notifify that changes have been made
// Put in timeout so it doesn't block main thread
setTimeout(() => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// what this block does
// why it's needed

this._cbs.map((cb) => {
if (cb.eventName == eventName && typeof cb.callback == 'function') {
cb.callback();
}
});
}, 1);
},
waitDdpConnected(cb) {
if (this.ddp && this.ddp.status == 'connected') {
69 changes: 58 additions & 11 deletions src/Meteor.js
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ import Random from '../lib/Random';

import Data from './Data';
import Mongo from './Mongo';
import { Collection, runObservers, localCollections } from './Collection';
import { Collection, getObservers, localCollections } from './Collection';
import call from './Call';

import withTracker from './components/withTracker';
@@ -20,6 +20,7 @@ const Meteor = {
enableVerbose() {
isVerbose = true;
},
_reactiveDict: new ReactiveDict(),
Random,
Mongo,
Tracker,
@@ -36,13 +37,15 @@ const Meteor = {
},
status() {
return {
connected: Data.ddp ? Data.ddp.status == 'connected' : false,
connected: !!this._reactiveDict.get('connected'),
status: Data.ddp ? Data.ddp.status : 'disconnected',
//retryCount: 0
//retryTime:
//reason:
};
},

removing: {},
call: call,
disconnect() {
if (Data.ddp) {
@@ -53,8 +56,14 @@ const Meteor = {
for (var i in Data.subscriptions) {
const sub = Data.subscriptions[i];
Data.ddp.unsub(sub.subIdRemember);
this.removing[sub.subIdRemember] = true;
sub.subIdRemember = Data.ddp.sub(sub.name, sub.params);
}
// If we get a double restart, make sure we keep track and
// remove it later
Object.keys(this.removing).forEach((item) => {
Data.ddp.unsub(item);
});
},
waitDdpConnected: Data.waitDdpConnected.bind(Data),
reconnect() {
@@ -64,7 +73,7 @@ const Meteor = {
return {
AsyncStorage:
Data._options.AsyncStorage ||
require('@react-native-async-storage/async-storage').default,
require('@react-native-community/async-storage').default,
};
},
connect(endpoint, options) {
@@ -82,7 +91,7 @@ const Meteor = {

if (!options.AsyncStorage) {
const AsyncStorage =
require('@react-native-async-storage/async-storage').default;
require('@react-native-community/async-storage').default;

if (AsyncStorage) {
options.AsyncStorage = AsyncStorage;
@@ -104,6 +113,7 @@ const Meteor = {

try {
const NetInfo = require('@react-native-community/netinfo').default;
// Reconnect if we lose internet
NetInfo.addEventListener(
({ type, isConnected, isInternetReachable, isWifiEnabled }) => {
if (isConnected && Data.ddp.autoReconnect) {
@@ -128,18 +138,22 @@ const Meteor = {
}
}

Data.notify('change');

if (isVerbose) {
console.info('Connected to DDP server.');
}
this._loadInitialUser().then(() => {
this._subscriptionsRestart();
});
this._reactiveDict.set('connected', true);
this.connected = true;
Data.notify('change');
});

let lastDisconnect = null;
Data.ddp.on('disconnected', () => {
this.connected = false;
this._reactiveDict.set('connected', false);

Data.notify('change');

if (isVerbose) {
@@ -165,8 +179,14 @@ const Meteor = {
};

Data.db[message.collection].upsert(document);

runObservers('added', message.collection, document);
let observers = getObservers('added', message.collection, document);
observers.forEach((callback) => {
try {
callback(document, null);
} catch (e) {
console.error('Error in observe callback', e);
}
});
});

Data.ddp.on('ready', (message) => {
@@ -206,8 +226,14 @@ const Meteor = {
});

Data.db[message.collection].upsert(document);

runObservers('changed', message.collection, document, oldDocument);
let observers = getObservers('changed', message.collection, document);
observers.forEach((callback) => {
try {
callback(document, oldDocument);
} catch (e) {
console.error('Error in observe callback', e);
}
});
}
});

@@ -216,8 +242,19 @@ const Meteor = {
const oldDocument = Data.db[message.collection].findOne({
_id: message.id,
});
let observers = getObservers(
'removed',
message.collection,
oldDocument
);
Data.db[message.collection].del(message.id);
runObservers('removed', message.collection, oldDocument);
observers.forEach((callback) => {
try {
callback(null, oldDocument);
} catch (e) {
console.error('Error in observe callback', e);
}
});
}
});
Data.ddp.on('result', (message) => {
@@ -231,13 +268,19 @@ const Meteor = {
});

Data.ddp.on('nosub', (message) => {
if (this.removing[message.id]) {
delete this.removing[message.id];
}
for (var i in Data.subscriptions) {
const sub = Data.subscriptions[i];
if (sub.subIdRemember == message.id) {
console.warn('No subscription existing for', sub.name);
}
}
});
Data.ddp.on('error', (message) => {
console.warn(message);
});
},
subscribe(name) {
let params = Array.prototype.slice.call(arguments, 1);
@@ -359,6 +402,10 @@ const Meteor = {
}
});
});
} else {
if (Data.subscriptions[id]) {
Data.subscriptions[id].inactive = true;
}
}

return handle;
Loading