Skip to content
This repository has been archived by the owner on Apr 20, 2018. It is now read-only.

Commit

Permalink
Turning hot observables to cold
Browse files Browse the repository at this point in the history
  • Loading branch information
mattpodwysocki committed Dec 28, 2013
1 parent e0f4709 commit dd33b76
Show file tree
Hide file tree
Showing 24 changed files with 330 additions and 345 deletions.
2 changes: 1 addition & 1 deletion rx.aggregates.min.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 59 additions & 62 deletions rx.async.compat.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,35 +109,34 @@
Observable.fromCallback = function (func, scheduler, context, selector) {
scheduler || (scheduler = timeoutScheduler);
return function () {
var args = slice.call(arguments, 0),
subject = new AsyncSubject();
var args = slice.call(arguments, 0);

scheduler.schedule(function () {
function handler(e) {
var results = e;

if (selector) {
try {
results = selector(arguments);
} catch (err) {
subject.onError(err);
return;
return new AnonymousObservable(function (observer) {
return scheduler.schedule(function () {
function handler(e) {
var results = e;

if (selector) {
try {
results = selector(arguments);
} catch (err) {
observer.onError(err);
return;
}
} else {
if (results.length === 1) {
results = results[0];
}
}
} else {
if (results.length === 1) {
results = results[0];
}
}

subject.onNext(results);
subject.onCompleted();
}
observer.onNext(results);
observer.onCompleted();
}

args.push(handler);
func.apply(context, args);
args.push(handler);
func.apply(context, args);
});
});

return subject.asObservable();
};
};

Expand All @@ -152,40 +151,40 @@
Observable.fromNodeCallback = function (func, scheduler, context, selector) {
scheduler || (scheduler = timeoutScheduler);
return function () {
var args = slice.call(arguments, 0),
subject = new AsyncSubject();

scheduler.schedule(function () {
function handler(err) {
if (err) {
subject.onError(err);
return;
}
var args = slice.call(arguments, 0);

var results = slice.call(arguments, 1);
return new AnonymousObservable(function (observer) {
return scheduler.schedule(function () {

if (selector) {
try {
results = selector(results);
} catch (e) {
subject.onError(e);
function handler(err) {
if (err) {
observer.onError(err);
return;
}
} else {
if (results.length === 1) {
results = results[0];

var results = slice.call(arguments, 1);

if (selector) {
try {
results = selector(results);
} catch (e) {
observer.onError(e);
return;
}
} else {
if (results.length === 1) {
results = results[0];
}
}
}

subject.onNext(results);
subject.onCompleted();
}
observer.onNext(results);
observer.onCompleted();
}

args.push(handler);
func.apply(context, args);
args.push(handler);
func.apply(context, args);
});
});

return subject.asObservable();
};
};

Expand Down Expand Up @@ -358,18 +357,16 @@
* @returns {Observable} An Observable sequence which wraps the existing promise success and failure.
*/
Observable.fromPromise = function (promise) {
var subject = new AsyncSubject();

promise.then(
function (value) {
subject.onNext(value);
subject.onCompleted();
},
function (reason) {
subject.onError(reason);
});

return subject.asObservable();
return new AnonymousObservable(function (observer) {
promise.then(
function (value) {
observer.onNext(value);
observer.onCompleted();
},
function (reason) {
observer.onError(reason);
});
});
};
return Rx;
}));
Loading

0 comments on commit dd33b76

Please sign in to comment.