Channable

Tech

How we optimized cluster-wide scheduling for sticky workloads

April 17, 2024

Channable is a tool for marketers and web shop owners that connects shops to marketplaces, affiliate platforms, and price comparison websites. We download product data through a feed file or API connection, process it and send the transformed data to any platform. The volume of data can be impressive (1.7 million products are processed per second on average), so we need strategies to make all this number crunching imperceptible for our customers.

Not only do we need to process a very large number of products every day, but we also want low latency without unsustainable processing costs. Performance is not just a nice-to-have: being able to process great volumes of data is a requirement for our larger customers, and many users appreciate the immediacy of changing a rule or setting in Channable and seeing the results within seconds.

In Channable, customers modify their data using IF … THEN … ELSE … rules. These rules are sent to one of our rule processing servers. The chosen rule processing server applies the rules to the product data and returns the processed product data for other services to use.

In this post we describe how we increased the performance of our product feed processing pipeline by changing how we select the rule processing server for a given unit of work (job). With our new scheduling system we were able to significantly reduce waiting times in our cluster, decreasing the 95th percentile of the queueing times for jobs by a factor of almost 4.5x.

Problem statement

People who have read our previous blogs will have some idea of the things we do to make the jobs our rule processing servers run as fast as possible. For optimal performance and reliability, we scale rule processing horizontally across a set of servers. Our original method of assigning rule processing jobs to rule processing servers used a pseudo-random selection of servers based on the project identifier and the set of currently-available rule processing servers.

The selection process was designed to evenly divide the processing workload across the available rule processing servers by generating a preference list of servers for each project. This preference list was then used to pick a server to run the project on, by trying each of the servers on the list in order and using the first server that responded successfully.

This approach was chosen to make the rule processing system more robust against failures where a single server would be unavailable, improving reliability and making operations easier.
The selection process consisted of the following steps:

  1. Query Consul to retrieve the list of currently-healthy rule processing servers,
  2. Sort this list of servers alphabetically,
  3. Shuffle the list of servers using a pseudo-random number generator seeded with the project identifier,
  4. Pick the first three servers in the resulting shuffled list and try to submit the rule processing job to each server in order.
    If a server fails to respond, move on to the next server. If all three servers fail to respond, error out.

The selection process is deterministic: given the same project identifier and set of available servers the same preference list is generated every time. The reason for using a preference list in this way was that the rule processing servers perform caching: if two rule processing jobs for the same project are processed at the same time on the same server, the rule processing server will deduplicate shared work where possible and e.g. only download the project's input data once. By making each project prefer the same set of servers we could take advantage of the caching without having to explicitly check on which servers the project was in cache.

While the described selection process was easy to implement and did not require coordination between the various services accessing the rule processing servers, it also was found to have some significant downsides:

  • The selection process did not take the servers' actual workload into account when assigning jobs to servers. The selection process only checked if rule processing servers were 'healthy' in Consul, i.e. online and not crashed, but not whether the servers were actually able to accept new jobs.

    The selection process would happily assign jobs to servers that were already at their processing capacity, which resulted in these jobs having to wait in the server's queue. This would happen even if other servers were idle, which is not ideal.

  • The rule processing workload was not evenly distributed among the rule processing servers: while the selection process did distribute the various projects roughly evenly across the available servers, the computational power required for each project varies wildly.
    This resulted in some rule processing servers often being overloaded, while other servers were idle at the same time.

  • The selection process was not robust against changes in the set of available servers: if any server was added to or removed from the set of available servers, almost all of the project-to-server assignments would change.
    This behavior was not originally seen as a problem, because when the scheduling algorithm was implemented the set of available servers was relatively stable. Channable later switched a part of the rule processing servers to "preemptible" virtual machines, which have a fixed lifetime and are regularly unavailable.
    The increased change in available servers and thus job-server assignments meant that we could use the cache on the rule processing servers less effectively, which caused longer job runtimes.

Experimenting with better approaches

As part of Maarten's Master's thesis project[1] we evaluated various alternative load balancing methods that could offer better performance than the method described above. To evaluate these methods we ran experiments using recorded production data in an isolated experiment environment.

In the experiment environment we replicated our production workload by 'replaying' 24 hours of HTTP requests to the rule processing servers. The servers the requests were sent to were picked by a custom-built scheduler service, as opposed to the decentralized server selection method used in production.

For each of the sent requests we recorded whether the request was successful, how long it took to perform and how much of this time was spent waiting in a server's queue.

To address the problems described in the previous section, we split our server selection process into two distinct requirements:

  1. Overload detection: determining if servers were overloaded and should not receive any work, and
  2. Server selection: picking a server from the set of servers returned by the overload detection process.

During each of the experiments we chose a single overload detection method and a single server selection method, to evaluate the performance of these two methods combined.

Overload detection approaches

For our overload detection method we compared three approaches:

  1. The baseline approach of not doing any overload detection: this approach matched our production approach of "just send requests until the server isn't available anymore".

  2. Circuit breaker: cut off requests to a server immediately if it has more than a fixed number of outstanding work items that are not making any progress.

  3. WeChat's DAGOR load-shedding algorithm [2]: this algorithm gradually reduces the rate of requests flowing to a server if the server is overloaded, instead of cutting off a server immediately.

Host selection approaches

For our host selection we also compared three approaches:

  1. The baseline approach of selecting servers by shuffling the list of servers based on the project identifier.

  2. Just randomly selecting any available server.

  3. 'Cache-aware' host selection.
    For this approach we kept track of the state of each server's cache, along with the number of rule processing jobs that were being executed on that server.
    When a host must be selected, the dynamic host selector picks the server where the project is in cache (if any), and otherwise picks the server with the least rule processing jobs in progress.

Experimental results

In our results we distinguish between jobs with low and high priorities. Jobs with low priority are triggered by automated processes and comprise the vast majority of the jobs in our system. Jobs with high priority are triggered in response to user action in the Channable tool, and are executed as quickly as possible to show the job's result to the user sooner by pausing execution of low-priority jobs if high-priority jobs exist.

To evaluate our overload detection methods and host selection methods we compared waiting times and total execution times for the rule processing jobs, grouped by priority.

We compared these values by looking at the average values, and by looking at the mean and outlier percentiles (90, 95, 99th percentile). Each of the results in the below figures are labeled with the overload detection method and host selection method in use for the experiment, respectively.

The experiments using fully-random host selection are not included in our results: during preliminary testing we found that random host selection performed significantly worse than the baseline host selection method, causing the benchmarking environment to grind to a halt and to be more than an hour slower to execute all of the jobs in the benchmark compared to the production environment. Because of this result we excluded random host selection from further experimentation and did not bother running a 24-hour experiment with it.

We first show the average job duration (i.e. total processing time) and waiting time:

Average job duration by priority

Average waiting time by priority

The waiting time and job duration of the high priority jobs did not differ too greatly between experiments (though it did improve), but the low-priority duration did.
Looking at the percentile comparison we can see that this is due to the outliers improving significantly:

Low-priority duration percentiles

Low priority waiting time percentiles

After our experiments we also noticed several other interesting outcomes:

  • The baseline experiment performed worse on average compared to the production environment's performance during the time our experiment data was gathered.
    This was determined to be due to the experiment environment having different retry behavior than the production environment, which resulted in it stressing the experiment environment more severely than the production environment.

  • Overload detection using the DAGOR algorithm performed slightly worse than the circuit breaker algorithm, while we had expected it to outperform the circuit breaker.
    On closer inspection we found that the DAGOR algorithm would almost always fully cut off requests to an overloaded server, effectively acting like a delayed circuit breaker. It is possible that DAGOR could outperform a static circuit breaker with more tuning, but we did not investigate further.

  • All approaches, apart from the baseline, greatly reduced the variance in wait times between servers, shown in the graph below.
    This is to be expected if the worst-case percentiles of the waiting time are reduced, but also nice to see.

Wait time variance

There are of course some caveats because we are using an experiment environment, most notably that we used static data.
The product feed data in our experiment environment was taken from a snapshot and did not change over time, which meant that the cache in our rule processing servers was likely more effective than it would have been in the real world.

Applying the new algorithm in the real world

While there are some caveats to our results, a circuit breaker overload detector with cache-aware (dynamic) host selection appeared to be the best solution. We were confident that implementing this type of scheduling in production would lead to a performance improvement, so that's what we set out to do next.

The following graph shows the 95th percentile of the duration and waiting time for our low-priority rule processing jobs during the eight days before and after the new scheduling algorithm was enabled in our production environment.

Job waiting time and duration in production around deploy

We can see that with the new scheduler active the queueing time drops dramatically: in the 24 hours before the scheduler was activated, the 95th percentile of the queueing time was 18.42 seconds, while in the 24 hours after the scheduler was activated this metric dropped to 4.11 seconds. This improvement in turn reduces the overall processing time. Nice!

We also saw an improvement in backlog imbalance, another metric which compares the backlog size of each server against the average backlog size across all servers. The backlog imbalance should ideally be close to zero (indicating the server has a backlog close to the average size), and the worst-case backlog imbalance also went down once we activated the new scheduling algorithm:

Backlog imbalance in production around deploy

These improvements in queueing behavior resulted in improvements both for us and the end users:

  • End users get to have faster execution for rules and imports.
  • We get to make better use of our servers, as the workload is distributed in a more effective way. Along with some other improvements in the execution of jobs, it allowed us to save money by deactivating a dedicated server that we used for some specific very heavy tasks. In addition, we now also need fewer general-purpose machines to run our jobs.

Conclusion

At Channable, we process very large amounts of data for our customers. Fast and reliable processing with minimal delay is important to our customers, and using minimal resources allows us to keep costs down. We hypothesized that the most compute-heavy part of our system could more efficiently scale workloads across servers while lowering the delay in job processing. We tried various methods in an experimental setup and found that the ‘circuit-breaker, dynamic’ approach yielded the best results, while being simpler than the DAGOR alternative we found in literature. Because of the good experimental results, we implemented this approach in production and improved processing delay while requiring fewer processing resources, saving costs.


1: For more information on the experiments and our rule processing system you can read Maarten's thesis at https://studenttheses.uu.nl/handle/20.500.12932/43909.

2: For more information see: "Overload Control for Scaling WeChat Microservices" by Zhou et al., https://arxiv.org/abs/1806.04075

avatar
Diego DiverioSoftware Development
avatar
Maarten van den BergDevOps

We are hiring

Are you interested in working at Channable? Check out our vacancy page to see if we have an open position that suits you!

Apply now