We have open sourced Aesop, the Flipkart Change Propagation system following the announcement of intent at slash n – the Flipkart tech conference. This post describes use of Aesop in one of our production systems.
PayZippy is an online payment solution launched by Flipkart. It is a Safe & Easy payment solution by Flipkart for e-commerce users and online merchants.
Need for change event propagation
It is fairly common for products and services to use Relational Databases to store business critical data. These data stores therefore become source of truth for such data.
However Relational Databases may not scale well for all kinds of use cases. Use cases include analytics, reporting, search indexing, etc that need this data in secondary data stores. Some of these secondary data stores are non-relational and are efficient at handling such use cases.There is then, a need, for a system to transfer data from the primary data store to these secondary data stores.
A number of tools and products in the ETL (Extract, Transform, and Load) space may be used to transfer data. However these are batched and do not transfer data in real time.
At PayZippy, data from these secondary stores is used to feed more than just business decisions. The data from these secondary data stores feeds into Real-Time use cases like Console, Fraud Detection and Monitoring Systems.
We therefore needed a system that can transfer data across data stores in real time.
Aesop is a keen observer of changes that can also relay change events reliably to interested parties. It provides useful infrastructure for building Eventually Consistent data sources and systems.
- Reads changes from Source Data Sources. Converts the changes to a Serializable form by using the schema registry. Stores the events in an internal buffer.
- Listens to requests from Clients and transport events to the clients. Provides clients with only those events pertaining to sources and partitions for which the client has registered.
- Single Client : It calls Relay Server to check for new events. On receipt of events from server it executes business logic – for e.g. writing to a destination data store. It checkpoints the SCN. If it falls off the relay, it connect to bootstrap for events. It reconnects back to Relay once it catches up.
- Participant in Cluster Client : It calls Relay Server for events only for partitions assigned to the client. It process Cluster Status change events from Helix and acts accordingly. It checkpoints SCN to shared storage.
- Bootstrap Producer : Similar to a Client. It checks for new data change events on relays and stores those events in a MySQL database. The MySQL database is used for bootstrap and catchup for clients.
- Bootstrap Server : Similar to Relay Server. It listens for requests from Databus Clients and returns long look-back data change events for bootstrapping and catch up.
High Availability, Load Balancing and Scaling
High Availability, Load Balancing and Scaling is built into Aesop. Since PayZippy is relying on the data moved via Aesop for Real time use cases these were essential to the strategy.
- High Availability and Load Balancing on the clients is achieved by having partitioned clients participate in a cluster with each client handling a partition (part of the data set). The partitions are dynamically assigned via Helix. The default partition function partition function used to filter the events is MOD.
- Partition reassignment when instances join or leave the cluster is automatic and the load (number of partitions assigned) is uniform across instances.
- The filtering itself is performed at the Relay and Bootstrap. Checkpoint for partitions is stored in Zookeeper. Uses Partitioning functionality available in Linkedin databus.
Relay – HA
High Availability of Relay Server is achieved using one of the following approaches :
Multiple Relay Servers read from the Source Data Sources.
- The Clients connect to Relay Server via a load balancer.
- Since the requests from clients are over HTTP one of the Relay Servers or both can be serving the request based on the configuration in the load balancer.
- When one Relay goes down, the other can still handle requests.
Relay Chaining with HA using Leader Follower model (Not yet implemented)
- Relay Producer reads from another Relay. This Relay can act as normal Relay for Clients.
- Relays have both producer, Source Producer and Relay Producers, but only one is active at a time. On Leader Relay the source producer is active. On follower relay the relay producer is active.
- Leader/Follower election is done using Helix.
Refer Aesop Github Wiki for more information on Aesop architecture and design.
Aesop at PayZippy
Blocking Startup Bootstrap
We needed to transfer existing data from MySql to destination data stores. We assumed we could use the same Relay Server and Client mechanism by pointing the Relay to the first bin log.
However we faced the following issues:
- The throughput of the client was dependant on that of the destination data store. This turned out to be far less than the Relay. The Relay was twice as fast as the client.
- The clients would fall off the Relay. The buffer in the Relay does not help as the Relay throughput is far higher than that of Client and hence the buffer fills up and starts overwriting even before the client has been able to pull the set of events.
- Thin Producer : A producer similar to producer in Relay that can pull events from the Source or the events are pushed to it from the Source. Processing related to serialization is skipped.
- Partitioned Consumers : Consumers consume only the particular partition in order and execute business logic. In this case it inserts into destination data store.
The consumer currently supported in PayZippy writes to a destination data source – for e.g. denormalized MySQL or HBase. The consumer calls Event Mapper and Transformer to get an Event pertaining to the schema of the destination data source. The transformed event is then written to Destination Data Source.
Monitoring and Alerting
Monitoring and alerting was important as the change propagation system is being used for Real time use cases. It is supported using:
- JMX Monitor (Soon to be Open Sourced) connects to JVM’s running Aesop. Fetches metrics and publishes to StatsD/Graphite. It also raises alerts based on configured thresholds.
- Skyline (Changes by Flipkart are not yet Open Sourced) is used to raise alerts based on algorithm configured.
- Aesop Dashboard – The dashboard provides a real-time view of the change propagation pipeline
We ran a few benchmarks for Aesop on standard 4 core, 16GB virtual machines running 64-bit Debian Linux. The Aesop components were run on separate VM instances. Performance numbers from the benchmarks:
Relay Server : Handles 19 million events an hour per data source. 18.2 GB per hour.
Relay Client : Can process data at same speed as Server. However it is limited by the capability or speed of end database. For MySql Client we are able to insert/update events at the rate of 10 million events an hour.
Latency : The latency between source data source and destination data source is within 80-90 milliseconds.
Appendix : Design Decisions
Dual Writes v/s Log Mining
Of the various ways of having data in multiple stores, two were evident.
Dual Writes : Application writes to destination data stores, synchronously or asynchronously. Application can also write to a Publisher-Subscriber system in which the Subscribers are consumers that eventually write to Destination Data stores
- Pros : Appears Easy : Application can publish the same event that is being inserted/updated in the Primary Data Source.
- Cons : Difficult to maintain consistency
- Updates with non-primary-key where clause. For cases where there are bulk updates (Updates with non-primary-key where clause), the application would have to then have to fetch affected records and publish the changes.
- Difficult to ensure all applications use the common data layer which publishes changes as well.
- Manual changes in Primary Data Store will be missed.
Log Mining : Separate application/service can extract changes from Database commit logs and publish them. This would use the same approach used by database for replication.
- Pros : Consistency can be guaranteed as changes are being read from commit logs (bin log in case of MySql).
- Appears tough – But definitely possible.
- Tied to mechanism used by database for replication. Tied to commit log format, etc … Tightly coupled approach.
Since Consistency across Datastores is of paramount importance to a financial system like PayZippy we chose the Log Mining approach.
Approaches to Log Mining – Bin Log Parser vs Storage Engine
MySql Bin Log Parsing
- Pros : Familiar approach
- Open source softwares were available that parsed MySql bin logs. Open Replicator and Tungsten Replicator
- If format of bin logs changes the parser would have to change.
- Open Replicator was supporting MySql version 5.5. We would have to modify Open Replicator to support MySql v5.6 and checksum feature introduced in v5.6.
Custom Storage Engine
- Pros : Independent of binlog format. Layers above Storage Engine take care of parsing.
- Cons : Unfamiliar approach. Unknown pitfalls.
We decided to go with known pitfalls and picked Bin Log Parsing approach.