Business Intelligence (BI) is the ability to collect substantial data from an information system to feed a Data Warehouse (DWH) or data lake. They usually provide a copy of the data that will be used for BI applications. Different strategies can be applied to feed a DWH. One such strategy is Change Data Capture (CDC), which is the ability to capture changing states from a database, and convert them to events that can be used for other purposes. Most databases are intended for OLTP purposes, and are well designed for this. Nonetheless, different use cases would require the same data with different access patterns. These use cases (big data, ETL, and stream processing, to name a few) mostly fall under the OLAP banner. Mixing them would make the OLTP and production environment at risk, thus we need to enable OLAP in a non-intrusive way.
OVH, as a cloud provider, manages numerous databases, both for its customers and its own needs. Managing a database lifecycle always involves both keeping the infrastructure up to date, and remaining in synch with the development release cycle, to align the software with its database dependency. For example, an app might require MySQL 5.0, which could then be announced as EOL (End Of Life). In this case the app needs to be modified to support (let’s say) MySQL 5.5. We’re not reinventing the wheel here – this process has been managed by operations and dev teams for decades now.
This becomes trickier if you don’t have control over the application. For example, imagine a third party provides you with an application to ensure encrypted transactions. You have absolutely no control over this application, nor the associated database. Nonetheless, you still need the data from the database.
This blog post relates a similar example we encountered while building the OVH data lake, with the help of an in-house CDC development. This story takes place in early 2015, although I still think it’s worth sharing. 🙂
Designing a non-intrusive Change Data Capture process
It’s usually good practice to establish the state of the technology before jumping into dev, as it will save time and strengthen communities. Back in early 2015, when the CDC landscape was first emerging (Debezium, a similar open-source solution, only appeared at the end of the same year), the only existing solution – Databus – came from LinkedIn. The Databus architecture was rather complex, and the project wasn’t very active. Also, it didn’t solve our security requirements, and we come from a strong Ops culture, so running a JVM on the database server was clearly a no-go for our Operations teams.
Although there was no CDC software matching our requirements, we eventually found a binlog replication library that we could integrate with the few of them in Go. Binlog is the MySQL name for the WAL database.
Our requirements were rather simple:
- Avoid JVM-based solutions (JVM and containers weren’t working well at the time, and it’d have been hard to get support from Ops)
- The CDC agent needed to connect to the CDC gateway for highly-secured environments (and not a gateway to agents)
- The CDC gateway could control its agents’ fleet
- The CDC agent could filter and serialise events to push them with back pressure control
- The CDC agent could dump the DB to get a first snapshot, since binlogs aren’t always available from the beginning
Here is the global design of the ‘Menura’ project:
Menura is the genus of the lyrebird: a bird that can replicate any sound. Most BI-related components are ‘birds’, since they’re part of the Business Intelligence R&D project!
Automate the BI Control Plane
As Menura was deployed on database servers, it could reflect available databases and tables in the BI Control Plane, so that a user could ask to sync with a given table. The controlling protocol had a few simple tasks:
- Add and configure a database source
- Manage remote configuration
- Map agent/tables cartography
- Dump database tables
- Manage CDC (start/stop sync, commit binlog offsets…)
gRPC was only just emerging at the time, but we saw in this project, with its strong foundations, an opportunity to reconcile Protobuf, Thrift, and language diversity. Furthermore, the ability to set up a bidirectional streaming RPC was interesting from the point of view of implementing client-to-server connections with server-to-client RPCs, so we made it the foundation of the controlling protocol.
gRPC uses Protocol Buffer as IDL to serialise structured data. Each StreamRequest is composed of a Header to manage multi-tenancy. This means that if our customers decided to name their sources with the same name, we could isolate control messages by tenant, not just by source. We therefore find a RequestType, as defined in Protobuf v3:
enum RequestType {
Control_Config = 0;
Control_Hearbeat = 1;
Control_MySQLClient = 10;
Control_MySQLBinlog = 11;
Control_Syslog = 12;
Control_PgSQLClient = 13;
Control_PgSQLWal = 14;
Control_PgSQLLogDec = 15;
Control_Kafka = 16;
Control_MSSQLClient = 17;
}
This RequestType allowed us to reach source plugins with the specialised structures they expect. Note that we decoupled DB Clients from DB Replication (binlog, WAL…). The main reason is that they don’t share the same scope, so the libraries aren’t the same. It therefore made sense to keep them separate.
Another reason is that replication acts as a slave for the database, meaning there is no significant footprint over a replication process, while a client dumping the DB could imply locking, rows or tables, given the database and its undergoing engine. This could have led to us having two different slaves, or the replication plugged into a master and the client plugged into a slave.
These concerns drove us towards a modular design for the Menura agent:
Filtering data
An important feature was the ability to filter events or columns. There were two reasons for this:
- We encountered databases with so many tables or columns that we needed to cut some noise
- We didn’t necessarily need to get certain data out of the database
Filtering closest to the DB was certainly the best choice, particularly for our customers, as they could then add or verify filters by themselves. To do this, we defined a Filter Policy to mimic IPTables policies with accept or drop defaults. The source filter then described how tables and columns would behave, depending on the policy:
filter_policy: accept/drop
filter:
table_a:
table_b:
ignored_columns:
— sensibleColumn
The drop policy will drop anything by default, except tables explicitly listed in the filter, while the accept policy will drop tables listed as empty in the filter, with the exception of tables that have an ignored_columns key, to filter only columns with their names.
Validations in heterogenous systems
For certain needs, you may want to confirm that you’re processing an analytics job on the same data that the true database consists of. For example, processing revenue calculation over a given period requires the true data from date to date. Validating a replication state between a database and the data lake was challenging. In fact, integrity checks aren’t implemented with the same logic in databases or stores, so we needed a way to abstract them from the native implementation. We thought about using a Merkle Tree data structure, so that we could maintain a tree of integrity with blocks or rows. If a key/value differed from the database, then the global or intermediate integrity hash would reflect it, and we would only have to scan the leaf block that had an inconsistent hash between both systems.
Let’s put things together
As we stated in our introduction, CDC is set up to convert database changes into processable events. The goal here is to fulfil any business needs that require data in an efficient and effective way. Here are two examples of what we did with the data that we now had available as events…
Real-time joins between databases
While we were building the data lake from replicated tables, and since this project was mainly for BI purposes, we considered adding some real-time insights, based on the same logic we we’re using with batch and Apache Pig jobs. Since 2015, the most advanced stream processing framework is Apache Flink, which we used to process real-time joins between different databases and tables.
Alexis did an amazing job describing the join process that we injected into Apache Flink, so that in addition to replicating databases, we were also creating a new, aggregated table. Indeed, we could write an entire blog post just on this topic…
We chose Apache Flink for multiple reasons:
- Its documentation was delightful
- Its core engine was brilliant, and very far beyond Apache Spark (the Tungsten project wasn’t even there)
- It was a European project, so we were close to the editor and its community
Real-time indexing
Now we had a real-time table fed into Apache HBase, we needed to add a query capability on top of it. While HBase was fine from a storage point of view, it didn’t provide any search capability, and its access pattern wasn’t ideal for scanning over the search criterion.
This is where Guillaume worked some magic! By reusing Lily, an HBase Indexer that provided the concept of SEP (Side Effect Processor), he succeeded in reinjecting the aggregated table schema into Lily to build the data type mapping needed to read HBase Byte Arrays values, before indexing them into Solr. We now had a real-time dashboard of an aggregated real-time joined table, processed from real-time change data capture. Boom!
That was when we started getting real customers for our new solution.
Going live!
If there is still a need to demonstrate that testing in a simulation environment is not the same as testing in a production environment, this next part will likely settle the debate…
After setting up the data pipeline, we discovered a few bugs in the production environments. Here are two of them:
Interleaved events
As defined by MySQL, an event structure is composed of both a header and a data field.
In Row-based Replication (RBR), as opposed to Statement-based Replication (SBR), each row’s event is replicated with its corresponding data. DML statements are binlogged into two parts:
- A
TABLE_MAP_EVENT
- A
ROWS_EVENT
(can beWRITE
,UPDATE
orDELETE
)
The first event, TABLE_MAP_EVENT
, describes the metadata of the second event’s content. This metadata contains:
- Included fields
- Null values bitmaps
- The schema of the upcoming data
- The metadata for the provided schema
The second event, WRITE_ROWS_EVENT
(for inserts) contains the data. To decode it, you need the previous TABLE_MAP_EVENT
event to known how to consume this event, matching the corresponding MYSQL_TYPE_*
, and reading the number of bytes expected for each types.
Occasionally, some events were not consumed properly, as a dis-alignment between metadata and data led to VARCHAR
values being decoded as DATETIME
values, etc.
After some debugging, it turned out that triggers had been added on some MySQL tables by the DBA team. When the replicas had been rebuilt some days later, they had inherited these features, and started to log the events produced by these triggers.
The thing is, triggers are internal with MySQL. In binlog, every event coming from the master is sent like this:
TableMapEvent_a
WriteEvent_a
TableMapEvent_b
WriteEvent_b
TableMapEvent_c
WriteEvent_c
a
, b
and c
represents events for different schema.tables.
Since triggers don’t come from the master, when the slave receives a TableMapEvent
for a specific table, it triggers another TableMapEvent
for a specialised table (<table>_event
). The same applies to the WriteEvent
.
When MySQL triggers an event, it sends it in the binlog, so you will end with a multiplexing of two TableMapEvents
, then two RowsEvents
, as shown below:
TableMapEvent_a
TableMapEvent_a_event
WriteEvent_a
WriteEvent_a_event
Got it! When we tried to decode WriteEvent_a, the previous TableMapEvent was for TableMapEvent_a_event, not for TableMapEvent_a, so it’d try to decode the event with the wrong schema.
We had to find a way to match the WriteEvent to the corresponding TableMapEvent. Ideally, there would have been a TableID in the structure that we could have used for this. In the end though, we just had to buffer all TableMapEvents, making them available to all RowsEvents, start reading the RowsEvent, pick the TableID, and then get the Columns metadata from the matching TableMapEvent. Fixed!
The elusive decimal…
We also encountered an arbitrary bug in the library, which caused Menura to explode. Again, we dug into the binlog library to debug the decoding process, step by step. We identified table/column tuples to limit the logging output to a more reasonable rate. A RowEvent
looked like this:
DEBUG MR: TableMapEvent.read() : event.TableName = myTable
DEBUG MR: TableMapEvent.read() : columnCount= 16
DEBUG MR: TableMapEvent.read() : Read Next columnTypeDef= [3 3 15 246 0 0 12 0 15 12 12 15 3 3 15 15]
DEBUG MR: readStringLength() : r.Next(int(12))
DEBUG MR: TableMapEvent.read() : ReadStringLength columnMetaDef= [255 0 10 2 40 0 10 0 255 0 255 3]
DEBUG MR: TableMapEvent.read() : columnNullBitMap= [10 198]
DEBUG MR: TableMapEvent.read() : switch columnTypeDef[3]=f6
DEBUG MR: TableMapEvent.read() : switch : case metaOffset+=2
DEBUG MR: TableMapEvent.read() : column.MetaInfo
DEBUG MR: TableMapEvent.read() : column.MetaInfo = [10 2]
In this log, there are parts of the decoding process that are quite interesting and worth taking a closer look at. A first column presents the following schema:
TableMapEvent.read() : Read Next columnTypeDef= [3 3 15 246 0 0 12 0 15 12 12 15 3 3 15 15]
Some of these data types require metadata to be read. For example, here is the corresponding log with column metadata:
TableMapEvent.read() : ReadStringLength columnMetaDef= [255 0 10 2 40 0 10 0 255 0 255 3]
Also, the NullBitMap
column is important, since we have to know which null values should be ignored while decoding the buffer.
This crash didn’t happen on a daily basis, and the stacktrace didn’t point me to a fixed part of the code. It seemed like a shift in the decoding that would cause arbitrary crashes when casting data types wasn’t possible. To debug at a deeper level, we needed to log more. And so we logged the buffer’s current offset, the size read for each data type, the metadata for each data type, and the value. Here is an example for a MYSQL_TYPE_NEWDECIMAL
:
DEBUG MR: rowsEvent.read(): pack.Len() BEFORE : 59
DEBUG MR: rowsEvent.read(): Column.MetaInfo: &{246 [10 2] true}
DEBUG MR: rowsEvent.read(): switch column.Type= 246
DEBUG MR: case MYSQL_TYPE_NEWDECIMAL
DEBUG MR: readNewDecimal() precision= 10 scale= 2
DEBUG MR: readNewDecimal() size= 5
DEBUG MR: readNewDecimal() buff=8000000000
DEBUG MR: readNewDecimal() decimalpack=0000000000
DEBUG MR: rowsEvent.read(): pack.Len() AFTER : 54
DEBUG MR: rowsEvent.read(): value : 0
DEBUG MR: rowsEvent.read(): pack.Len() BEFORE : 54
DEBUG MR: rowsEvent.read(): Column.MetaInfo: &{0 [] false}
DEBUG MR: rowsEvent.read(): switch column.Type= 0
Regarding the previous schema, we have 16 columns, and according to MySQL documentation our data types provide metadata as in the following table:
type code
|
MYSQL_TYPE
|
Metadata
|
---|---|---|
3 | MYSQL_TYPE_LONG | 0 |
15 | MYSQL_TYPE_VARCHAR | 2 |
0 | MYSQL_TYPE_DECIMAL | 2 |
12 | MYSQL_TYPE_DATETIME | 0 |
0 | MYSQL_TYPE_NEWDECIMAL | 2 |
This gives us 18 bytes of metadata for this example schema, as opposed to the 10 bytes in the packet.
We also found that MySQL apparently didn’t send the metadata needed to read DECIMAL
values in the packet. Was this a normal behaviour?
The MySQL documentation is clear: to read a DECIMAL
value, you need the metadata (precision, scale etc.). Period.
However, we found that MYSQL_TYPE_DECIMAL
was treated like MYSQL_TYPE_NEWDECIMAL
.
case MYSQL_TYPE_DECIMAL, MYSQL_TYPE_NEWDECIMAL:
value.value = pack.readNewDecimal(int(column.MetaInfo[0]), int(column.MetaInfo[1]))
We stepped back and searched for how this MYSQL_TYPE_DECIMAL
was implemented in other binlog libraries. I was not DBA, but it felt strange that schema using DECIMAL
values were actually using two different MySQL data types.
Okay… “Houston, we have a problem.”
First, nobody was implementing MYSQL_TYPE_DECIMAL
, and for a very good reason: we shouldn’t be receiving it, since it had been deprecated from MySQL since version 5.0. This meant the database behind was running a table created from (at best) MySQL 4.9, while the database had been upgraded without having received a proper ALTER to automatically convert data types to MYSQL_TYPE_NEWDECIMAL
.
Second, since we don’t have any control over the database, how do we decode a MYSQL_TYPE_DECIMAL
…
First attempt: Ignore it
We first circumvented this issue by actually not reading two bytes of metadata when we parsed a MYSQL_TYPE_DECIMAL
column. This stopped corrupting the metadata offset, and other data types were now aligned with their metadata.
We missed the decimal value, but we could continue to read other data types. Well, sort of… It was better, but to read values after a MYSQL_TYPE_DECIMAL
in the data buffer, we needed to know how many bytes to read.
Second attempt: The naïve approach (i.e. guessing!)
A decimal value is a fractional number, usually encoded as a floating-point number. For example, a DECIMAL(10,2)
column has eight integer digits and two fractional digits. The integer digits determine the number of bytes needed to be read. For example, we read four bytes for the integer part, and one byte for the fractional part. This would have been so simple… if we had the metadata.
In practice, MySQL didn’t provide any metadata for DECIMAL
values, hence why we ignored it in the first iteration, to preserve other data. Have you ever tried to decode old binlogs with the official mysqlbinlog tool? If you had a MYSQL_TYPE_DECIMAL
in your data, then it would stop decoding there. Yes… MySQL doesn’t know how to decode its own data format!
One could argue that if MySQL doesn’t provide any metadata, it’s because it stores it internally, at a fixed size. Well… no!
sql value
|
byte array
|
type
|
---|---|---|
0.00 | 32 32 32 32 32 32 32 48 46 48 48 | decimal(10,2) |
0.000 | 32 32 48 46 48 48 48 | decimal(5,3) |
Here’s how it actually works… Decimals are encoded as VARCHAR in the protocol. I tried to read the value, assuming the space padding, flagged the dot encountered, and tried to read fractional data that seemed coherent for a decimal. If it wasn’t, I eventually unread the last byte in the buffer and continued to the next data type. And it worked. For a time…
DEBUG MR: case MYSQL_TYPE_DECIMAL DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: continue DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: continue DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: continue DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: continue DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: continue DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: continue DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: continue DEBUG MR: readOldDecimalV2: byte = 48 DEBUG MR: readOldDecimalV2: start writing DEBUG MR: readOldDecimalV2: byte = 46 DEBUG MR: readOldDecimalV2: dot found DEBUG MR: readOldDecimalV2: writing DEBUG MR: readOldDecimalV2: byte = 48 DEBUG MR: readOldDecimalV2: writing DEBUG MR: readOldDecimalV2: byte = 48 DEBUG MR: readOldDecimalV2: writing DEBUG MR: readOldDecimalV2: byte = 32 DEBUG MR: readOldDecimalV2: unread, break DEBUG MR: readOldDecimalV2: converting to float : 0.00 DEBUG MR: rowsEvent.read(): pack.Len() AFTER : 43 DEBUG MR: rowsEvent.read(): value : 0
We hope we don’t encounter a following VARCHAR type with a length that could be parsed as a DIGIT value, but the dynamic size of the DECIMAL value means that there should be metadata available to properly read this. There is no other way.
Third attempt: There is no compromise when it comes to being a good slave!
We asked ourselves what makes mysqlbinlog different to a MySQL Slave when it comes to reading binlogs. We found that the only difference was that a true slave knew the DECIMAL
schema and associated metadata when receiving these data. So it wouldn’t have to guess anything – just read the right number of bytes, according to the known schema.
We ended up implementing a MySQL client into our mysqlbinlog source, which initially dumped the schemas of tables to pass in the NumericScale value into the decoding library. The pitfall here is that rows aren’t identified in schemas by their ColumnName
. MySQL maintains an OrdinalPosition
for the columns in a table, but it’s not the ID that is provided in the binlog protocol (that would be too easy!). You have to maintain your own column index from the schema, to make it match the one you will receive in the binlog protocol. Once you have it, just look up the decimal scale value to know how many bytes you still have to read after the dot.
This way, the decoding library was now capable of decoding MYSQL_TYPE_DECIMAL
from the binlog stream of events. Hooray!!
TL;DR
In the end, building a BI stack from scratch took approximately six months. The team was composed of 2.5 people: Alexis Gendronneau, Guillaume Salou (who joined us after three months) and me. It demonstrated the principle of Change Data Capture applied to real-life use cases, enabling real-time insights into sales, stocks, and more, without any impact on production environments. The team grew as the project extended its scope, with new, more challenging customers, like financial services and management control teams. Weeks later, we succeeded in launching it on Apache Flink, based on the same data pipeline that has since become the trusted source for revenue calculation and other business KPIs.
We learned a lot from this project. A key lesson is how important keeping your technical debt in control can be, and what impact it can have on other teams and projects. Also, working with Apache Flink for a range of projects proved to be a wonderful experience for our teams.
The whole team delivered great work, and Dimitri Capitaine is about to open-source the data collector agent that powered the preview labs: OVH Data Collector. If you’re interested in discussing Change Data Capture at OVH in greater depth, feel free to join us on the team’s Gitter, or DM me on Twitter.