Skip to content Skip to sidebar Skip to footer

How To Allow Web Workers To Receive New Data While It Still Performing Computation?

I want to sort an array, using Web Workers. But this array might receive new values over time, while the worker is still performing the sort function. So my question is, how can I

Solution 1:

Even though the Worker works on an other thread than the one of your main page, and can thus run continuously without blocking the UI, it still runs on a single thread.

This means that until your sort algorithm has finished, the Worker will delay the execution of the message event handler; it is as blocked as would be the main thread.

Even if you made use of an other Worker from inside this worker, the problem would be the same.

The only solution would be to use a kind of generator function as the sorter, and to yield it every now and then so that the events can get executed.

But doing this will drastically slow down your sorting algorithm.

To make it better, you could try to hook to each Event Loop, thanks to a MessageChannel object: you talk in one port and receive the message in the next Event loop. If you talk again to the other port, then you have your own hook to each Event loop.

Now, the best would be to run a good batch in every of these Event loop, but for demo, I'll call only one instance of our generator function (that I borrowed from this Q/A)

const worker = newWorker(getWorkerURL());
worker.onmessage = draw;

onclick = e =>     worker.postMessage(0x0000FF/0xFFFFFF); // add a red pixel// every frame we request the current state from WorkerfunctionrequestFrame() {
  worker.postMessage('gimme a frame');
  requestAnimationFrame(requestFrame);
}
requestFrame();

// drawing partconst ctx = canvas.getContext('2d');
const img = ctx.createImageData(50, 50);
const data = newUint32Array(img.data.buffer);
ctx.imageSmoothingEnabled = false;

functiondraw(evt) {
  // converts 0&1 to black and white pixelsconst list = evt.data;
  list.forEach((bool, i) =>
    data[i] = (bool * 0xFFFFFF) + 0xFF000000
  );
  ctx.setTransform(1,0,0,1,0,0);
  ctx.clearRect(0,0,canvas.width,canvas.height);
  ctx.putImageData(img,0,0);
  // draw bigger
  ctx.scale(5,5);
  ctx.drawImage(canvas, 0,0);
}

functiongetWorkerURL() {
  const script = document.querySelector('[type="worker-script"]');
  const blob = newBlob([script.textContent]);
  returnURL.createObjectURL(blob);
}
body{
  background: ivory;
}
<scripttype="worker-script">// our listconst list = Array.from({length: 2500}).map(_=>+(Math.random()>.5));
// our sorter generatorlet sorter = bubbleSort(list);
let done = false;
/* inner messaging channel */const msg_channel = newMessageChannel();
// Hook to every Event loop
msg_channel.port2.onmessage = e => {
  // procede next step in sorting algo// could be a few thousands in a loopconst state = sorter.next();
  // while runningif(!state.done) {
    msg_channel.port1.postMessage('');
    done = false;
  }
  else {
    done = true;
  }
}
msg_channel.port1.postMessage("");

/* outer messaging channel (from main) */
self.onmessage = e => {
  if(e.data === "gimme a frame") {
    self.postMessage(list);
  }
  else {
    list.push(e.data);
    if(done) { // restart the sorter
      sorter = bubbleSort(list);
      msg_channel.port1.postMessage('');
    }
  }
};

function* bubbleSort(a) { // * is magicvar swapped;
  do {
    swapped = false;
    for (var i = 0; i < a.length - 1; i++) {
      if (a[i] > a[i + 1]) {
        var temp = a[i];
        a[i] = a[i + 1];
        a[i + 1] = temp;
        swapped = true;
        yield swapped; // pause here
      }
    }
  } while (swapped);
}
</script><pre> click to add red pixels</pre><canvasid="canvas"width="250"height="250"></canvas>

Note that the same can be achieved with an async function, which may be more practical in some cases:

const worker = newWorker(getWorkerURL());
worker.onmessage = draw;

onclick = e =>     worker.postMessage(0x0000FF/0xFFFFFF); // add a red pixel// every frame we request the current state from WorkerfunctionrequestFrame() {
  worker.postMessage('gimme a frame');
  requestAnimationFrame(requestFrame);
}
requestFrame();

// drawing partconst ctx = canvas.getContext('2d');
const img = ctx.createImageData(50, 50);
const data = newUint32Array(img.data.buffer);
ctx.imageSmoothingEnabled = false;

functiondraw(evt) {
  // converts 0&1 to black and white pixelsconst list = evt.data;
  list.forEach((bool, i) =>
    data[i] = (bool * 0xFFFFFF) + 0xFF000000
  );
  ctx.setTransform(1,0,0,1,0,0);
  ctx.clearRect(0,0,canvas.width,canvas.height);
  ctx.putImageData(img,0,0);
  // draw bigger
  ctx.scale(5,5);
  ctx.drawImage(canvas, 0,0);
}

functiongetWorkerURL() {
  const script = document.querySelector('[type="worker-script"]');
  const blob = newBlob([script.textContent]);
  returnURL.createObjectURL(blob);
}
body{
  background: ivory;
}
<scripttype="worker-script">// our listconst list = Array.from({length: 2500}).map(_=>+(Math.random()>.5));
// our sorter generatorlet done = false;


/* outer messaging channel (from main) */
self.onmessage = e => {
  if(e.data === "gimme a frame") {
    self.postMessage(list);
  }
  else {
    list.push(e.data);
    if(done) { // restart the sorterbubbleSort(list);
    }
  }
};

asyncfunctionbubbleSort(a) { // async is magicvar swapped;
  do {
    swapped = false;
    for (var i = 0; i < a.length - 1; i++) {
      if (a[i] > a[i + 1]) {
        const temp = a[i];
        a[i] = a[i + 1];
        a[i + 1] = temp;
        swapped = true;
      }
      if( i % 50 === 0 ) { // by batches of 50?awaitwaitNextTask(); // pause here
      }
    }
  } while (swapped);
  done = true;
}

functionwaitNextTask() {
  returnnewPromise( (resolve) => {
    const channel = waitNextTask.channel ||= newMessageChannel();
    channel.port1.addEventListener("message", (evt) =>resolve(), { once: true });
    channel.port2.postMessage("");
    channel.port1.start();
  });
}

bubbleSort(list);
</script><pre> click to add red pixels</pre><canvasid="canvas"width="250"height="250"></canvas>

Solution 2:

You can do it with some trick – with the help of setTimeout function interrupting. For example it is not possible without an addition thread to execute 2 functions parallel, but with setTimeout function interrupting trick we can do it like follows:

Example of parallel execution of functions

var count_0 = 0,
    count_1 = 0;

functionfunc_0()
{
    if(count_0 < 3)
        setTimeout(func_0, 0);//the same: setTimeout(func_0);console.log('count_0 = '+count_0);
    count_0++
}

functionfunc_1()
{
    if(count_1 < 3)
        setTimeout(func_1, 0);

    console.log('count_1 = '+count_1)
    count_1++
}

func_0();
func_1();

You will get this output:

count_0 = 0 count_1 = 0 count_0 = 1 count_1 = 1 count_0 = 2 count_1 = 2 count_0 = 3 count_1 = 3

Why is it possible? Because the setTimeout function needs some time to be executed. And this time is even enought for the execution of some part from your following code.

Solution for you

For this case you have to write your own array sort function (or you can also use the following function from me) because we can not interrupt the native sort function. And in this your own function you have to use this setTimeout function interrupting trick. And you can receive your message event notification.

In the following example I have the interrupting in the half length of my array, and you can change it if you want.

Example with custom sort function interrupting

var numbers = [4, 2, 1, 3, 5];

// this is my bubble sort function with interruption/**
 * Sorting an array. You will get the same, but sorted array.
 * @param {array[]} arr – array to sort
 * @param {number} dir – if dir = -1 you will get an array like [5,4,3,2,1]
 *                 and if dir = 1 in opposite direction like [1,2,3,4,5]
 * @param {number} passCount – it is used only for setTimeout interrupting trick.
 */functionsortNumbersWithInterruption(arr, dir, passCount)
{
    var passes = passCount || arr.length,
        halfOfArrayLength = (arr.length / 2) | 0; // for ex. 2.5 | 0 = 2// Why we need while loop: some values are on// the end of array and we have to change their// positions until they move to the first place of array.while(passes--)
    {
        if(!passCount && passes == halfOfArrayLength)
        {
            // if you want you can also not write the following line for full break of sortingsetTimeout(function(){sortNumbersWithInterruption(arr, dir, passes)}, 0);
            /*
                You can do here all what you want. Place 1
            */break
        }

        for(var i = 0; i < arr.length - 1; i++)
        {
            var a = arr[i],
                b = arr[i+1];

            if((a - b) * dir > 0)
            {
                arr[i] = b;
                arr[i+1] = a;
            }
        }

        console.log('array is: ' + arr.join());
    }

    if(passCount)
        console.log('END sring is: ' + arr.join());
}

sortNumbersWithInterruption(numbers, -1); //without passCount parameter/*
    You can do here all what you want. Place 2
*/console.log('The execution is here now!');

You will get this output:

array is: 4,2,3,5,1 array is: 4,3,5,2,1 The execution is here now! array is: 4,5,3,2,1 array is: 5,4,3,2,1 END sring is: 5,4,3,2,1

Solution 3:

You can do it with insertion sort (kind of). Here is the idea:

  1. Start your worker with an internal empty array (empty array is sorted obviously)

  2. Your worker receives only elements not the entire array

  3. Your worker insert any received element right in correct position into the array

  4. Every n seconds, the worker raises a message with the current array if it has changed after the last event. (If you prefer, you can send the array on every insertion, but is more efficient to buffer somehow)

Eventually, you get the entire array, if any item is added, you will receive the updated array to.

NOTE: Because your array is always sorted, you can insert in correct position using binary search. This is very efficient.

Solution 4:

There are two decent options.

Option 1: Worker.terminate()

The first is just to kill your existing web worker and start a new one. For that you can use Worker.terminate().

The terminate() method of the Worker interface immediately terminates the Worker. This does not offer the worker an opportunity to finish its operations; it is simply stopped at once.

The only downsides of this approach are:

  • You lose all worker state. If you had to copy a load of data into it for the request you have to do it all again.
  • It involves thread creation and destruction, which isn't as slow as most people think but if you terminate web workers a lot it might cause issues.

If neither of those are an issue it is probably the easiest option.

In my case I have lots of state. My worker is rendering part of an image, and when the user pans to a different area I want it to stop what it is doing and start rendering the new area. But the data needed to render the image is pretty huge.

In your case you have the state of your (presumably huge) list that you don't want to use.

Option 2: Yielding

The second option is basically to do cooperative multitasking. You run your computation as normal, but every now and then you pause (yield) and say "should I stop?", like this (this is for some nonsense calculation, not sorting).

let requestId = 0;

onmessage = event => {
  ++requestId;
  sortAndSendData(requestId, event.data);
}

function sortAndSendData(thisRequestId, data) {
  let isSorted = false;
  let total = 0;

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    // Check if we are still the current request ID.if (thisRequestId !== requestId) {
      // Data was changed. Cancel this sort.return;
    }
  }

  postMessage(total);
}

This won't work though because sortAndSendData() runs to completion and blocks the web worker's event loop. We need some way to yield just before thisRequestId !== requestId. Unfortunately Javascript doesn't quite have a yield method. It does have async/await so we might try this:

let requestId = 0;

onmessage = event => {
  console.log("Got event", event);
  ++requestId;
  sortAndSendData(requestId, event.data);
}

asyncfunctionsortAndSendData(thisRequestId, data) {
  let isSorted = false;
  let total = 0;

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    awaitPromise.resolve();

    // Check if we are still the current request ID.if (thisRequestId !== requestId) {
      console.log("Cancelled!");
      // Data was changed. Cancel this sort.return;
    }
  }

  postMessage(total);
}

Unfortunately it doesn't work. I think it's because async/await executes things eagerly using "microtasks", which get executed before pending "macrotasks" (our web worker message) if possible.

We need to force our await to become a macrotask, which you can do using setTimeout(0):

let requestId = 0;

onmessage = event => {
  console.log("Got event", event);
  ++requestId;
  sortAndSendData(requestId, event.data);
}

functionyieldToMacrotasks() {
  returnnewPromise((resolve) =>setTimeout(resolve));
}

asyncfunctionsortAndSendData(thisRequestId, data) {
  let isSorted = false;
  let total = 0;

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    awaityieldToMacrotasks();

    // Check if we are still the current request ID.if (thisRequestId !== requestId) {
      console.log("Cancelled!");
      // Data was changed. Cancel this sort.return;
    }
  }

  postMessage(total);
}

This works! However it is extremely slow. await yieldToMacrotasks() takes approximately 4 ms on my machine with Chrome! This is because browsers set a minimum timeout on setTimeout(0) of something like 1 or 4 ms (the actual minimum seems to be complicated).

Fortunately another user pointed me to a quicker way. Basically sending a message on another MessageChannel also yields to the event loop, but isn't subject to the minimum delay like setTimeout(0) is. This code works and each loop only takes ~0.04 ms which should be fine.

let currentTask = {
  cancelled: false,
}

onmessage = event => {
  currentTask.cancelled = true;
  currentTask = {
    cancelled: false,
  };
  performComputation(currentTask, event.data);
}

asyncfunctionperformComputation(task, data) {
  let total = 0;

  let promiseResolver;

  const channel = newMessageChannel();
  channel.port2.onmessage = event => {
    promiseResolver();
  };

  while (data !== 0) {
    // Do a little bit of computation.
    total += data;
    --data;

    // Yield to the event loop.const promise = newPromise(resolve => {
      promiseResolver = resolve;
    });
    channel.port1.postMessage(null);
    await promise;

    // Check if this task has been superceded by another one.if (task.cancelled) {
      return;
    }
  }

  // Return the result.postMessage(total);
}

I'm not totally happy about it - it relies on postMessage() events being processed in FIFO order, which I doubt is guaranteed. I suspect you could rewrite the code to make it work even if that isn't true.

Post a Comment for "How To Allow Web Workers To Receive New Data While It Still Performing Computation?"