rxjs: QueueScheduler schedule method: this.SchedulerAction is not a constructor

RxJS version: 5.0.0-beta11

Actual behavior: I tried to use the beta 11 today in the project I’m working on, and the QueueSchduler schedule method raises the following error: TypeError: this.SchedulerAction is not a constructor.

Code to reproduce: Actually it seems that the SchedulerAction is not defined. The error is raised by running the schedule method of an instance we create:

scheduler = new QueueScheduler();
scheduler.schedule(() => someSubject.next(someState), 0)

Additional information: By reading the Scheduler class, I found out the constructor does not set the SchedulerAction attributes:

constructor(private SchedulerAction: typeof Action,
              now: () => number = Scheduler.now) {
    this.now = now;
  }

I may missed something, but is the following line not missing ?

this.SchedulerAction = SchedulerAction;

Also, the QueueScheduler class could reuse the Scheduler constructor with QueueAction by default ?

// QueueScheduler.ts
export class QueueScheduler extends AsyncScheduler {
  constructor(private SchedulerAction: typeof Action = QueueAction) {
    super(SchedulerAction);
  }
}

// queue.ts
export const queue = new QueueScheduler();

Im also wondering if I’m doing wrong when instantiating a QueueScheduler, maybe only use Scheduler.queue could be sufficient…

Thanks for this great repo !

++

Augustin

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 15 (8 by maintainers)

Most upvoted comments

@trxcllnt yeah, I think we should pair program on the VirtualTimeScheduler stuff because I think there is a lot of value to it to have a more complete swappable clock. I think you captured the essence in that we need the basics.

The scheduler itself should be initialized with an initial clock value, as well as a comparer for values to determine whether we need to increment the clock. We made the virtual time scheduler overly generic and I’m not sure it’s super necessary to have all the ceremony we had before.

The following methods should be supported:

  1. sleep - Advance the clock without running any of the scheduled work
  2. advanceTo - Advance the clock to an absolute time and run all scheduled work
  3. advanceBy - Advance the clock by a relative time and run all scheduled work
  4. schedule -Obviously to schedule the work
  5. start - Starts the scheduler
  6. stop - Stops the scheduler

Why don’t we get started on this, so we could add some nicer stuff for historical scheduling as well?

@mattpodwysocki yes! happy to be moving forward on this. just pushed a virtual-time-schedulers branch for us to collaborate.

IIRC the thing we’ve been calling the VirtualTimeScheduler was hastily implemented to support writing tests as marble diagrams, but nobody’s had the time (heh) to carry it to the finish line.

I was hoping at some point @mattpodwysocki would want to collaborate on the remaining Scheduler features 😃. Though seeing as how I’ve mostly failed to communicate the requirements and designs of the new Schedulers in detail, I’ll take the blame for dropping the ball here. So in the interest of at least getting this info down somewhere, the rest of this post will be all the what and why details I can remember about the new Schedulers. @jayphelps hopefully this will also help you if you’re on Scheduler maintenance duty.

A Scheduler is a sorted queue of things to do. Just like all the other versions of Rx, it schedules each thing to do as a disposable Action, so an Action extends Subscription (aka Disposable). The Scheduler constructors accept an Action class. When you call schedule(), the Scheduler creates and returns you an instance of this class.

To schedule something to happen in the future, we started with the simplest API we could (but no simpler), initially only supporting relative due times:

const action = Scheduler.schedule(
  function work(this: Action<T>, state?: any) {
    /* do some stuff now */
    state.count += 1;
    if (state.count < 10) {
      /* do this again later */
      this.schedule(
        state /* <-- next state */ ,
        200 /* <-- next due time (or period) */
      );
    }
  },
  100 /* <-- initial due time */,
  { count: 0 } /* <-- initial state */
);

/* some time later... */
action.unsubscribe();

We identified disposable and closure allocation, closure invocation, and async request call patterns as the major performance bottlenecks, but we wanted to keep the familiar recursive scheduling API and reentrancy behaviors. Thus, the Schedulers were factored into two separate components, for performance and extensibility:

  1. Queue ordering, insertion, and removal – handled by the Actions
  2. Queue flushing and reentrancy control – handled by the Schedulers

The Action instance is responsible for updating the shared queue of the Scheduler it was created by. An Action is responsible for insertion, removal, ordering, and flushing its Scheduler’s queue. This may initially seem counter-intuitive; we wouldn’t typically allow a queue to be updated by the things that go in it. However, this approach allowed us to make a number of key performance improvements while keeping the core, familiar recursive scheduling API:

  1. Rescheduled Actions created by each of the production Schedulers (Async, Queue, ASAP, rAF) are recycled. For example, one Action is created per subscription to an Observable.interval(0), no matter how many periods elapse. This can dramatically reduce memory pressure/GC cost in scenarios with many concurrent periodically scheduled Actions. VirtualActions (created by the VTScheduler) are immutable, because they need to be inspected by the TestScheduler to ensure each execution of work does what it’s supposed to.
  2. Recycling the Action instances effectively mimics TCO across async boundaries. We don’t create any intermediate closures to accommodate a recursive API, so it’s faster both to [re]schedule and execute an Action, regardless of whether the environment supports native TCO or not.
  3. Actions initially created by one of the specialized trampolining Schedulers (Queue, ASAP, rAF) can be rescheduled with a different due time, will still be recycled, and will executed by the Async Scheduler. It can also choose to reschedule itself again with a 0 due time, and it will be run by its original Scheduler.
  4. Async Actions can be scheduled via one setInterval instead of serial setTimeout calls. This is less stressful on the VM for Actions with stable periods, but also ensures singly-executed Actions are consistently invoked in order with respect to other Actions (both periodic or otherwise). The VM invokes setInterval callbacks as close to the time they should execute, adjusting for time lost in long frames under load, providing a more accurate picture of “real” time.
  5. For the specialized trampoline Schedulers (ASAP, rAF, and in the future, requestIdleScheduler), multiple actions scheduled in the same execution of the event-loop will only request a single “next tick,” at which time they’ll all be executed in order. This is better for the VM (for example, there’s only ever one “requestAnimationFrame” handler to invoke), and also gives us the flexibility to sort and prioritize the Actions that will run on the next tick. This is a critical feature in advanced rendering pipelines that need to order scheduled render triggers for layout and repaint (something that comes up in my work these days).

When it’s time to flush a Scheduler’s queue, the Scheduler executes each Action, handling errors and reentrancy. For example, the Async and Queue Schedulers guard against recursion by pushing rescheduled actions to the end of the queue and unwinding the stack. This is identical to previous versions of Rx. By default, the production Schedulers stop executing if an Action returns an Error, and disposes the rest of the scheduled Actions. There’s probably room to experiment here.

Early on we made a decision to switch the default scheduling semantics from trampolining (aka CurrentThreadScheduler) to recursive (aka RecursiveScheduler). However instead of implementing a new RecursiveScheduler, RxJava proved that defaulting to iterative loops in the absence of a Scheduler yields identical call patterns, and is dramatically faster in most cases.

So to finish the VTScheduler, we need to add a public API for start, stop, sleep, sorting and comparing time, scheduling by absolute time, and advancing by relative and absolute time (instead of count). @mattpodwysocki did I miss anything?