DEV Community

YCM Jason
YCM Jason

Posted on

Limit concurrent asynchronous calls

Although Javascript is designed to be single threaded, you could still do things concurrently.

For example, we can read multiple files concurrently.

const readFile = require('util').promisify(require('fs').readFile);

const readAllFiles = async (paths) => {
  return await Promise.all(paths.map(p => readFile(p, 'utf8')));
}
Enter fullscreen mode Exit fullscreen mode

However, reading files could be quite computationally expensive; if there are more than 10k paths, you will probably hear the fans on your machine speed up as your machine struggles. Your node server/program will respond significantly slower too as there are 10k+ file reading operations in the OS's thread-pool competing with the node server.

The solution is simple. Simply limit the number of file reading operations in the thread-pool. In another words, limit the number of concurrent calls to readFile.

Let's define a generic function asyncLimit(fn, n) which will return a function that does exactly what fn does, but with the number of concurrent calls to fn limited to n. We will assume fn returns a Promise.

const asyncLimit = (fn, n) => {
  return function (...args) {
    return fn.apply(this, args);
  };
};
Enter fullscreen mode Exit fullscreen mode

Since we know that asyncLimit returns a function that does whatever fn does, we first write this out. Note that we don't use arrow function as fn might need the binding to this. Arrow function does not have it's own binding.

If you are not familiar with this in Javascript, read my article explaining what is this later. For now, just ignore it.

const asyncLimit = (fn, n) => {
  let pendingPromises = [];
  return function (...args) {
    const p = fn.apply(this, args);
    pendingPromises.push(p);
    return p;
  };
};
Enter fullscreen mode Exit fullscreen mode

Since fn returns a Promise, we could keep track of the "process" of each call by keeping the promises they returns. We keep those promises in the list pendingPromises.

const asyncLimit = (fn, n) => {
  let pendingPromises = [];
  return async function (...args) {
    if (pendingPromises.length >= n) {
      await Promise.race(pendingPromises);
    }

    const p = fn.apply(this, args);
    pendingPromises.push(p);
    return p;
  };
};
Enter fullscreen mode Exit fullscreen mode

We mark our returning function as async, this enables us to use await in the function. We only want to execute fn only if there are less than n concurrent calls going on. pendingPromises contains all previous promises. So we can just check the pendingPromises.length to find out how many concurrent calls there are.

If pendingPromises.length >= n, we will need to wait until one of the pendingPromises finishes before executing. So we added await Promise.race(pendingPromises).

const asyncLimit = (fn, n) => {
  let pendingPromises = [];
  return async function (...args) {
    if (pendingPromises.length >= n) {
      await Promise.race(pendingPromises);
    }

    const p = fn.apply(this, args);
    pendingPromises.push(p);
    await p;
    pendingPromises = pendingPromises.filter(pending => pending !== p);
    return p;
  };
};
Enter fullscreen mode Exit fullscreen mode

We want to get rid of the promise in the pendingPromises once they are finished. First we execute fn, and it returns p. Then we add p to the pendingPromises. After this, we can do await p; p will be finished after this line. So we simply filter out p from pendingPromises.

We are almost done. Let's recap what we are doing here:

if pendingPromises.length < n

  1. we call fn and obtain the promise p
  2. push p onto pendingPromises
  3. wait p to finish
  4. remove p from pendingPromises
  5. return p

if pendingPromises.length >= n, we will wait until one of the pendingPromises resolves/rejects before doing the above.

There is one problem with our code tho. Let's consider the following:

const f = limitAsync(someFunction, 1);
f(); // 1st call, someFunction returns promise p1
f(); // 2nd call, someFunction returns promise p2
f(); // 3rd call, someFunction returns promise p3
Enter fullscreen mode Exit fullscreen mode

The first call goes perfectly and pendingPromises.length becomes 1.

Since pendingPromises.length >= 1, we know that both 2nd and 3rd call will be calling await Promise.race([p1]). This means that when p1 finishes, both 2nd and 3rd calls will both get notified and executes someFunction concurrently.

Put it simple, our code does not make the 3rd call to wait until the 2nd call has finished!

We know that 2nd call will get notified first and resumes from await Promise.race([p1]). 2nd call executes someFunction and pushes its promise to pendingPromises, then it will do await p.

As 2nd call does await p, 3rd call will resume from await Promise.race([p1]). And here is where the problem is. The current implementation allow the 3rd call to execute someFunction and blah blah blah that follows.

But what we want is that the 3rd call would check pendingPromises.length >= n again and do await Promise.race([p2]). To do this, we could simply change if to while.

So the final code would be:

const asyncLimit = (fn, n) => {
  let pendingPromises = [];
  return async function (...args) {
    while (pendingPromises.length >= n) {
      await Promise.race(pendingPromises).catch(() => {});
    }

    const p = fn.apply(this, args);
    pendingPromises.push(p);
    await p.catch(() => {});
    pendingPromises = pendingPromises.filter(pending => pending !== p);
    return p;
  };
};
Enter fullscreen mode Exit fullscreen mode

Notice that I have added .catch(() => {}) to the Promise.race and await p. This is because we don't care if the promise resolves or rejects, we just wanna know if they are finished.

I have publish this to npm if you wish to use. Here is the github link if you want to see how I added tests for this function.

What do you think? Did you follow the tutorial?

EDIT:

Top comments (14)

Collapse
 
smishr4 profile image
Shubham Mishra

I implemented this whole thing with a working code, let me know if I am doing something incorrectly.

Note: I am using setTimeout for async calls so promise failure will not happen.

const ASYNC_LIMIT = 2;

function scheduler(cb, id, delay) {
    return new Promise((resolve, reject) => {
        setTimeout(() => {
            cb(id);
            resolve();
        }, delay);
    });
}

const asyncLimit = (fn, n) => {
    let promiseArray = [];

    return async function(...args) {
        if (promiseArray.length >= n) {
            await Promise.race(promiseArray);
        }

        let p = fn.call(this, ...args);
        promiseArray.push(p);

        p.then(() => {
            promiseArray = promiseArray.filter(pending => p !== pending);
        });

        return p;
    };
};

let cb = id => {
    console.log(id + " task completed", Date.now() % 10000);
};

let modifiedScheduler = asyncLimit(scheduler, ASYNC_LIMIT);

modifiedScheduler(cb, 1, 5000);
modifiedScheduler(cb, 2, 2000);
modifiedScheduler(cb, 3, 1500);
modifiedScheduler(cb, 4, 3000);
modifiedScheduler(cb, 5, 4000);
modifiedScheduler(cb, 6, 1000);
modifiedScheduler(cb, 7, 2500);

Collapse
 
tconrado profile image
tconrado

hey, tried your code, did not work...

so, assigned the same delay for all of those...like 2 seconds... I did expect to see it process 2, schedule + 2; so, the result would be observable every 2 seconds and everytime 2 task

Collapse
 
tconrado profile image
tconrado

using the corrections from @kusha and the data and functino of your code it did work as expected!
hurray!

exceptional implementation to deal with REST API

Collapse
 
smishr4 profile image
Shubham Mishra

Can you post your working snippet here?

Thread Thread
 
tconrado profile image
tconrado • Edited

hey I'm terrible with markdown, but the example bellow is in a lib, basically it does limit the number of active connections to a http API rest service to the a defined number (8 in this case), so a the http request is resolved, it start another connection keeping always 8 active connections to the API server; it will completely hide the connection/handshake delay while guarantee no 429 error (too many requests); from my experience fastest safe approach as you can know the maximum number of calls per second of the API service


// this is the asyncLimit adjusted 
const asyncLimit = (fn, n) => {
  const pendingPromises = new Set();
  return async function(...args) {
    while (pendingPromises.size >= n) {
      await Promise.race(pendingPromises);
    }
    const p = fn.apply(this, args);
    const r = p.catch(() => {});
    pendingPromises.add(r);
    await r;
    pendingPromises.delete(r);
    return p;
  };
};

// native node.js https module to connect to shopify servers
const https = require('https')
exports.httpRequest = function(method, path, body = null) {
  const reqOpt = { 
    method: method,
    path: '/admin' + path,
    hostname: 'xxxxxxxxxxxxxxxxxxxx.myshopify.com', 
    headers: {
      "Content-Type": "application/json",
      "X-Shopify-Access-Token": "xxxxxxxxxxxxxxxxxxxx",
      'Cookie': '',
      "Cache-Control": "no-cache"
    }
  }
  if (body) reqOpt.headers['Content-Length'] = Buffer.byteLength(body);
  return new Promise((resolve, reject) => {

      const clientRequest = https.request(reqOpt, incomingMessage => {
          let response = {
              statusCode: incomingMessage.statusCode,
              headers: incomingMessage.headers,
              body: []
          };
          let chunks = ""
          incomingMessage.on('data', chunk => { chunks += chunk; });
          incomingMessage.on('end', () => {
              if (chunks) {
                  try {
                      response.body = JSON.parse(chunks);
                  } catch (error) {
                      reject(error)
                  }
              }
              resolve(response);
          });
      });
      clientRequest.on('error', error => { reject(error); });
      if (body) { clientRequest.write(body)  }  
      clientRequest.end();

  });
}


// the number 8 bellow can be changed to match the REST API service limits
// assume that this amount will call at once and will be replaced dynamically, hence
// if the service limit it 20 calls per second, be aware that 8 calls will hit the service at once
// using 40% of the maximum (avoid going higher)

exports.ratedhttpRequest = asyncLimit(exports.httpRequest, 8);

Collapse
 
kepta profile image
Kushan Joshi

Great article! Some minor improvements.

const asyncLimit = (fn, n) => {
  const pendingPromises = new Set();
  return async function(...args) {
    while (pendingPromises.size >= n) {
      await Promise.race(pendingPromises);
    }

    const p = fn.apply(this, args);
    const r = p.catch(() => {});
    pendingPromises.add(r);
    await r;
    pendingPromises.delete(r);
    return p;
  };
};
Enter fullscreen mode Exit fullscreen mode
Collapse
 
ycmjason profile image
YCM Jason

This is nice! 👍👍👍

Collapse
 
lednhatkhanh profile image
Nhat Khanh • Edited

Thank you so much for this post, learn a lot from this brilliant idea, also I added types (typescript) for this function in case someone needs this:

export function asyncLimit<T extends (...args: any) => Promise<any>>(fn: T, n: number): T {
  let pendingPromises = [] as Promise<ReturnType<T>>[];

  return async function limitedFunction(this: ThisType<T>, ...args: Parameters<T>) {
    while (pendingPromises.length >= n) {
      await Promise.race(pendingPromises);
    }

    const p = fn.apply<ThisType<T>, Parameters<T>, Promise<ReturnType<T>>>(this, args);

    pendingPromises.push(p);
    await p;
    pendingPromises = pendingPromises.filter(promise => promise !== p);

    return p;
  } as T;
}
Collapse
 
benjaminblack profile image
Benjamin Black • Edited

As written, asyncLimit will not resolve until the async function completes, because of await p.catch(() => {});.

Instead,

const p = fn.apply(this, args);
pendingPromises.push(p);
p.finally(() => {
    pendingPromises = pendingPromises.filter(pending => pending !== p);
});
return p;

Collapse
 
ycmjason profile image
YCM Jason

But we need the async function fn to complete before resolving the async function (...args) {. Isn't it?

Collapse
 
benjaminblack profile image
Benjamin Black • Edited

It's difficult to mentally follow the chain of promises here, because you have an async function (const asyncLimit = async...) which returns an async function (return async function) which itself awaits at least two promises (one in a loop) before resolving.

Note that asyncLimit does not have to be async, as it does not use await; removing async from the function signature would help comprehension a bit, since you would reduce one level of Promise-ception and stay out of limbo.

I guess it doesn't really matter, because the promise returned by asyncLimit can be used by the calling code. But attaching a finally clause to p instead of awaiting it allows the function to return immediately, instead of waiting for p to resolve.

Thread Thread
 
ycmjason profile image
YCM Jason

oh, I mistyped haha! Thanks for catching this!

asyncLimit shouldn't be an async function.

Collapse
 
benjaminblack profile image
Benjamin Black • Edited

No, because the function returns p itself, not the chained promise. The caller can attach its own .catch() clauses to p.

As in,

function foo() {
    let p = Promise.reject();

    p.catch(() => console.log('gotcha'));

    return p;
}


let p = foo();

p.catch(() => console.log('gotcha again'))
Collapse
 
ycmjason profile image
YCM Jason

Thanks!! Benjamin's reply is accurate! :)