Polling is Hard

Polling is Hard
Photo by benjamin lehman / Unsplash

I've been several years working in backend and data related jobs and there are some tasks that keep popping up again, one of them is the need to  "listen to that database and be notified when anything changes".

This problem has different solutions depending on the year you had to solve it and the technologies your company is using.

Lots of years ago if you were using an open source database you probably had to do polling which is basically a technique that relies in querying continuously the database for changes. If you used a proprietary database you probably had an expensive addon that you could just activate to get data out of the database every time it changed.

Then the Lambda architecture appeared and people started talking about "streaming" architectures, that is, data flowing through "pipes", with processes reading from them and processes writing into them.

These kind of patterns clashed with some of the ideas of databases in general. Databases up to that moment were meant to keep the data stored safely, they were mostly designed to keep the data in there, not to keep it and stream it.

Different database vendors saw a business opportunity (or need) and started to offer different ways to accomplish that, and in parallel there was a big investment in Change Data Capture systems (CDC from now on), such as Debezium, which do exactly this: Listen to a database, and send its inserts/updates/deletes in real time to a streaming platform, such as Kafka.

Unfortunately this is not enough sometimes. There are still situations nowadays where you still need to roll up your sleeves and implement a polling mechanism to an ancient database, a weird CRM system or ... god knows what.

Let me tell you some of my experiences with polling, because it sounds simple, but it might get tricky

Polling basics

First things first. Here are few points you need to know about your data source before writing a single line of code.

Conceptually polling is the act accessing the datasource to fetch all the rows that have changed since you did your last poll and keep doing that forever, resulting in a flow of data.

In these next sections we are going to dissect a polling technique and understand all the implications that need to be taken care of:

  1. How do we know that the data has been updated?
  2. What happens when there is a deletion?
  3. How are you going to read the data from the db the first time without bringing the thing down?
  4. Resiliency: How is your poller going to survive reboots, connection issues, ...
  5. Does your datasource have any peculiarity?

How do we know that the data has been updated?

Have a look at the tables you'll need to read, all of them will need to have a similar pattern. The idea is that you do:

1) select all data // initial load
2) select all data that changed since [step 1] finished
3) select all data that changed since [step 2] finished
...
n) select all data that changed since [step n-1] finished
...

The question to answer here is how do I know what changed between [step n] and [step n+1]?, and this will depend on the database engine and the table schema.

Luckily there are some common table design patterns that are helpful when dealing with this:

  1. Incremental ids.
  2. Last updated timestamp.

What about db triggers?

Relational databases have triggers, well, respectable databases have triggers. Should we use them in these situations? Probably yes.

The concept is simple, just create a trigger that for every change in the streamed table you get what changed and insert a "delta" in another table, which is what you are going to stream.

The format for this table should be special for streaming, for example:

CREATE TABLE cdc_events (
    event_id int NOT NULL AUTO_INCREMENT, // So you'll be able to 
                                          // checkpoint this table 
                                          // easily
    data string NOT NULL,                 // contents of the event
    operation string NOT NULL,            // insert, update or delete ?
    updated_at timestamp NOT NULL         // timestamp when this event 
                                          // happened, also will make
                                          // it easy to checkpoint
);

After you have this table ready you only need do basic polling, streaming and deleting what you just polled. You definitely don't want this table to grow indefinitely. So:

  1. Run select
  2. get the checkpoint (`event_id`)
  3. Stream
  4. Delete from the table until event_id

This is called “Outbox Pattern” by the way, and it’s a known solution for these kind of situations if you can easily create these auxiliar tables.

What happens when there is a deletion?

This is, in fact, a very important point. You can delete data from a database in two different ways

  • Hard deletes: This is when rows are really deleted, using a sql DELETE FROM, data just goes away.
  • Soft deletes (or logical deletes): This is when the rows are not physically deleted from the db. Usually there is a column that indicates that the column is deleted, this technique is called tombstones. So, instead of deleting the row with a sql DELETE you just update the tombstone field to be set true.

The polling mechanism will need to take this into account.

If the table is using hard deletes you'll have a very bad time. Basically it cannot be done without triggers, not great.

When the table that you need to stream is using hard deletes you'll need to set up a trigger that captures the deletion and updates a temporal table, then your poller will need to check this table also and stream it. It's a cumbersome process and you might have out of order events in your stream, as you'll need to setup two different pollers (one for your main table, and one for the deletions table that's populated via triggers).

On the other hand, if the system you need to stream is using soft deletes (or logical deletes) you're lucky, you don't need to do anything special other than taking the tombstone column into account.

How are you going to read the data from the db the first time without bringing the whole thing down?

The initial load might be a problem or not depending on the size and use of the database

If you're lucky enough to have a small table, or a database that's not heavily used in production you can set your initial id to 0, your last_updated timestamp to 1970 or whatever solution works for your checkpoint strategy. In the end, you'll be doing a kind of sql SELECT * from the table contents.

Is this a problem? well, for a 1000 rows table that might not be a problem, but if you have some millions of rows in there and the database sees serious usage you definitely do not want to do that.

You'll need to think this one carefully.

There are databases that let you use "bulk" types of queries. These queries are specially designed to run big massive jobs on the background, and store the results somewhere else, where you can easily fetch them and stream them.

Of course, this comes with a catch, after you've streamed your initial load you'll have to start polling from the latest timestamp that came from the original bulk. So, take that into account, because you'll have to store this information somewhere.

Resiliency

What will you do when your poller goes down? Because, you know. It will eventually go down.

Let's think a moment of what this means:

  • Connect to the db you need to stream.
  • Run a long query (initial dump)
  • Process the results of the query: mapping data, filtering?, ...
  • Stream the results to the streaming engine (Kafka, Kinesis, ...)

Any experienced developer will realise that ANY of these points can fail in multiple ways. So, the polling system that you implement will need to be able to react to those well known integration points.

From my experience it means that you need to checkpoint, checkpoint, checkpoint and checkpoint. And handle idempotency. And understand exactly what kind of transactional integrity you're dealing with.

Step by step. You'll realise that we can translate the previous points into different polling stages: First you need to run the SELECT query and get the results. Do you need to checkpoint? Not really. You need to checkpoint once the processed row gets into the streaming system (and you really need to make sure that the event is there, this is discussion for another day).

Checkpointing is writing the point where you have successfully READ one row from the db and WRITTEN it into the streaming system in a durable storage. Why do I emphasise the durable storage? It's because your polling system will restart eventually and you really want to make sure that when it goes up again it will be able to continue from the exact place where it left off. So, do not store the checkpoints in RAM, use an external system or use an appending log (but make sure that the storage is permanent, VM instances tend to wipe out the hard disk when they reboot)

Let's expand on this. Imagine that you are running your poller in a query like this:


  SELECT id, col1, col2, col3, last_updated
  FROM table1
  WHERE last_updated < last_checkpointed_ts
  ORDER BY last_updated

So, you fetch the Result iterator and for each row you read the data from the iterator, transform it and send it through the streaming service. Naturally each time you checkpoint (basically write the last_updated field in a db elsewhere). If the system goes down, with its "permanent" k8s storage and its RAM the system will be able to restart again, go to the external DB, recover the checkpointing state and continue at the exact place where it last checkpointed. Amazing, but wait, it's not exactly that easy.

Resiliency in polling systems goes hand in hand with the concept of idempotency. What will the system do on repeated polls over the same data? will you be able to detect it? does it really matter? From the poller point of view it's simpler be on the safe side and, in case of doubt, poll data that has already been polled and streamed. This sounds better than missing data behind.

This is fine as long as you document this feature. The clients that will read from the stream will need to know that they might receive the same event more than once. This might be tricky or extremely trivial. It depends 100% on the kind of data you're sending. If you're sending financial commands to be executed the clients might need to implement an idempotency check, if the data is just a temperature reading from a sensor you're probably good repeating some events from time to time. Unfortunately idempotency is an extensive and complex topic, which is (again) for another day

In order to have a more performant system you could try to checkpoint only when you have successfully written a whole batch, but then if your system goes down in the middle of the batch it will restart its execution at the beginning, streaming repeated data into the stream. If the problem that brought the system down is permanent (bad mapping of data, wrong types, ...) your poller will be continuously restarting and sending the same records over and over through the stream.

So, do checkpoint often.

Does your datasource have any peculiarity?

This point might seem a little bit off, but trust me, it's important.

Polling is basically running queries continuously and streaming them, so no big deal if you took into account all that I've written. But wait, there is more. All the database engines (or data sources) will behave differently in some corner cases, you need to know it, otherwise I can assure you that you'll have sleepless nights (looking at you, Salesforce)

Few points that you need to know if you want to create a real, resilient and trustable poller:

What happens if somebody modifies a row while you're fetching the select results?

This is very hard to test, but your system should be able to handle it. Let's imagine that you're reading a big chunk of data. The select you just ran returns 200 rows that have been modified during the last 1 hour.

So, you start the fetching phase of those rows from the db and while you're doing that, somebody modifies row number 157. What happens? well, it depends on your storage system, you need to test it, but there is a good possibility that you have the two following problems:

  1. Row 157 will have a more current timestamp field that row number 158, and 159, and ...:

Let's illustrate this with an example:

  Results from query:
  ...
  ['id-156', 'John Smith', '+004400812832', '01-03-2021T19:58:00']  
  // this is the `last_updated` field.
  ['id-157', 'John Doe', '+004400812658', '01-03-2021T19:58:01']    
  // This row was affected one second later
  ['id-158', 'Jack Smith', '+004400812719', '01-03-2021T22:00:00'] 
  // This is NOW, it was modified while I was fetching this.
  ['id-159', 'Jack Doe', '+004400812729', '01-03-2021T19:58:02']    
  // Back at the "past"
  ...

The problem with this is that you need to take that into account that if you are checkpointing at every read row you might have an issue here. It's a corner case yes, but it happens.

Let's suppose that you are actually doing this, checkpointing at every read operation. After reading result number 158 from the query your system will checkpoint at timestamp ...22:00:00, which is what the system is supposed to do, but then your system goes down. When it starts again it will fetch the latest stored checkpoint and it will get this ...22:00:00, so it will start getting data after that. Which means that you've missed row 159, and possibly many more.

One solution for this issue is maintaining the "previous checkpoint", if for any reason you realise that the previous checkpoint is in the future of the one you just read it means that this happened. So, you need to handle it.

If you're using a temporal field for checkpointing (last_updated for example), does it have enough granularity?

This matters because this affects how you checkpoint. Imagine that a select results in these rows:


  Results from query, notice that all of them have the same `last_updated` timestamp.
  ...
  ['id-156', 'John Smith', '+004400812832', '01-03-2021T19:58:00']
  ['id-157', 'John Doe'  , '+004400812658', '01-03-2021T19:58:00']
  ['id-158', 'Jack Smith', '+004400812719', '01-03-2021T19:58:00']
  ['id-159', 'Jack Doe'  , '+004400812729', '01-03-2021T19:58:00']
  ...

This is probably a result of a batch job in the db, where different records have been created or updated at the same time, because the timestamp is not granular enough. This is no problem per se and it happens a lot in reality. So far so good.

Now, let's run this query and fetch the results. We are checkpointing at every row, so you read the first result, and checkpoint to 01-03-2021T19:58:00, fine. I fact, you'll be checkointing the same timestamp for every row. Fine.

The problem is when the system goes down at line 157 for example. System goes down, and when it boots again it will get the checkpoint, and start consuming after that right? Probably you've already seen now, that we are potentially skipping tons of records

What's the solution of this? the idea is to:

  1. Change the timestamp granularity to a more specific one, so this situation cannot happen.
  2. Handle it in your code. How to do this? one thing that you can checkpoint iff the new checkpoint is bigger than the previous one. This will solve it, but in case of error you'll be re-reading the rows from the previous batch. You'll need to handle idempotency yourself.

TLDR

Polling is hard

This is it. I hope that you had fun reading through this or helped you in any way if you need to implement one of these mean beasts. As always, if you have comments, doubts or requests feel free to drop me an e-mail! (you'll find it at the footer of this blog)