Category Archives: Engineering

Flipkart Lite — The how?

To know what Flipkart Lite is, read our previous article on the story behind building the Progressive Web App: A New Way to Experience Mobile

The tech behind the application

Well, where do I even start? The following is the list of most of the tech behind Flipkart Lite in NO particular order. Many thanks to all the authors and contributors of these tools, libraries, frameworks, specification etc…

Front-end:

Tools — Build and Serve:

Web platform:

And a few more that I’m probably missing.

Some of the things listed above come with the tag “Experimental Technology”. But, trust me, take a leap of faith, play around, implement it in your app and push it to production. You’ll definitely feel proud.

The Monolith

The architecture decision started with choosing between “One big monolith” and “Split the app into Multiple Projects”. In our previous projects we had one big monolith repository and the experience getting one’s code to master wasn’t pleasant for many of the developers. The problem having all developers contributing to one single repository is — facilitating them to push their code to production quickly. New features flow in every day and the deployments kept slowing down. Every team wants to get its feature into master and effectively into production ASAP. The quintessential requirement of every single person pushing his/her code to his/her branch is that it should get reviewed and merged in the next hour, pushed to a pre-production like environment in the next 5 minutes and ultimately production on the same day. But how can you build such a complicated app from scratch right from the day 1.

We started with a simple system and added complexity

Separate concerns

The goal was to have the following DX(Developer Experience) properties,

  • To develop a feature for a part of the application, the developer needs to checkout only the code for that particular application
  • One can develop and test one’s changes locally without any dependencies on other apps
  • Once the changes are merged to master, these changes should be deployed to production without depending/waiting on other applications

So we planned to separate our app into different projects with each app representing a product flow — ex: Home-Browse-Product or the pre-checkout app, Checkout app, Accounts app, etc… but also not sacrificing on some of the common things that they can share with each other. This felt like restricting usage in technology. But everyone agreed to use React, Phrontend and webpack :). So it became easy to add simple scripts into each of the project and automate the build and deploy cycles.

Out of these different apps, one of them is the primary one — the Home-Browse-Product app. Since the user enters Flipkart Lite through one of the pages in this app, let’s call it the “main” app. I’ll be talking only about the main app in this article as the other apps use a subset of tooling and configuration the same as in the “main” app.

Home-Browse-Product

The “main” app takes care of 5 different entities. Since we use webpack, each of the entities just became a webpack configuration.

  • vendors.config.js : react, react-router, phrontend, etc…
  • client.config.js : client.js → routes.js → root component
  • server.config.js : server.js → routes.js → root component
  • hbs.config.js : hbs-loader → hbs bundle → <shell>.hbs
  • sw.config.js : sw.js + build props from client and vendor

vendors.config.js

This creates a bundle of React, react-router, phrontend and a few other utilities.

{
  entry: {
    react: "react",
    "react-dom": "react-dom",
    "react-router": "react-router",
    phrontend: "phrontend",
    ...
  },
  output: {
    library: "[name]",
    libraryTarget: "this",
    ...
  },
  plugins: [
    new webpack.optimize.CommonsChunkPlugin({ name: "react" }),
    new CombineChunksPlugin({ prelude: "react" })
  ]
}

This is roughly the configuration for the vendors js. We tried different approaches and this worked for us. So the concept is to

  • specify different entry points
  • use CommonsChunkPlugin to move the runtime to this chunk
  • use CombineChunksPlugin (I’ve uploaded the source here) to combine all the chunks to one single file with the runtime chunk at the top

As a result, we get one single file — vendors.<hash>.js, which we then sync to our CDN servers.

The vendors list is passed down the build pipeline to other apps as well and each of them (including main) externalise the vendors so as to be used from vendors.bundle.js,

{
  externals: vendors
}

client.config.js

As the name says, it is the configuration to build the app. Here we do minification and extraction of css into a single file.

server.config.js

Why do we even need a server.config.js ? Why do we need to bundle something for server?

We use webpack heavily and we rely on some conventions — we add custom paths to module resolution, import css files, use ES6 code, and node cannot understand most of this natively. Also, we don’t want to bundle every single dependency into one single file and run it with node. Webpack provides a way to externalise those dependencies leaving the requires for those externals untouched and this is one way of doing it,

var nodeModules = {};
fs.readdirSync(‘node_modules’).filter(function(m) {
  return [‘.bin’].indexOf(m) === -1;
}).forEach(function(m) {
  nodeModules[m] = ‘commonjs2 ‘ + m;
});

and in the webpack configuration,

{
  output: {
    libraryTarget: "commonjs2",
    ...
  },
  externals: nodeModules,
  target: "node"
}

sw.config.js

The sw.config.js bundles sw-toolbox, our service-worker code and the Build versions from vendors.config.js and client.config.js. When a new app version is released, since we use [hash]es in the file names, the URL to the resource changes and it reflects as a new sw.bundle.js. Since we now have a byte diff in the sw.bundle.js, the new service worker would kick in on the client and update the app and this will work from the next Refresh.

hbs.config.js

We use two levels of templating before sending some markup to the user. The first one is rendered during the build time hbshbs. The second one is rendered during runtime hbshtml. Before proceeding further into this, I’d like to talk about HTML Page Shells and how we built them with react and react-router.

HTML Page Shells

Detailed notes on what HTML Page shells or Application Shells are are given here — https://developers.google.com/web/updates/2015/11/app-shell . This is how one of our page shells look like —

The left image shows the “pre-data” state — the shell, and the right one is the “post-data” state — the shell + content. One of the main things this helps us in achieving is this —

          Perceived > Actual

It’s been a thing for quite some time that what the user perceives is the most important in UX. For example, splashscreen — it informs that something is loading and gives the user some kind of progress. Displaying a blank screen is no use to the user at all.

So I want to generate some app shells. I have a react application and I’m using react-router. How do I get the shells ?

Gotchas ? maybe.

We did try some stuff and I’m going to share what worked for us.

componentDidMount

This is a lifecycle hook provided by React that runs ONLY on the Client. So on the server, the render method for the component is called but componentDidMount is NOT invoked. So, we place all our API calling Flux actions inside this and construct our render methods for all our top level components carefully such that once it renders, it gives out the Shell instead of throwing an empty container.

Parameterised Routes

We decided that we would create shells for every path and not a simple generic one that you can use for anything. We found all the paths that the application used and would use. We wrote a small script that iterated through all the routes we defined to react-router and we had this —

/:slug/p/:itemid
/(.*)/pr
/search
/accounts/(.*)

During build time, how can you navigate to a route (so as to generate HTML Page Shells for that route), with an expression in the route that is resolved only during run time ?

            Hackity Hack Hack Hack

React-router provides a utility function that allows you to inject values to the params in the URI. So it was easy for us to simply hack it and just inject all the possible params in the URIs.

function convertParams(p) {
  return PathUtils.injectParams(p, {
    splat: 'splat',
    slug: 'slug',
    itemId: 'itemId'
  });
}

Note: We used react-router 0.13. The APIs might have changed in later versions. But something similar would be used by react-router.

And now we get this route table.

Route Defined     Route To RenderPageShell
/:slug/p/:itemid  → /slug/p/itemId     → product
/(.*)/pr          → /splat/pr          → browse
/search           → /search            → search
/accounts/(.*)    → /accounts/splat    → accounts

It simply works because the shell we are generating does NOT contain any content. It is the same for all similar pages (say product page).

Two-level templating

We are back to hbs.config.js. So we have one single hbs file — index.js with the following content.

// this is where the build time renderedApp shell goes in
// React.renderToString output
<div id="root">{{content}}</div>

// and some stuff for second level 
// notice the escape <script nonce="\{{nonce}}"> <script src="\{{vendorjs}}"></script> <script src="\{{appjs}}"></script>

Build time rendering: For each type of route, we generate a shell with $content injected into index.hbs and get the hbs for that particular route, example — product.hbs

Runtime rendering: We insert all the nonce, bundle versions and other small variables into the corresponding app shell hbs and render it to the client.

The good thing is that the user gets a rendered html content before static resources load, parse and execute. And this will be faster than server-side rendering, as this is as good as serving a static HTML file. The only variables in the template are a few numbers that are fetched by in-memory access. This improves the response-time and the time to first paint.

The end

And with all this and a lot of gotchas that I missed out, we put Flipkart Lite into production successfully. Oh wait! This didn’t start as a story. Anyway. Thanks for reading till here.

All the solutions described above are NOT necessarily the best solutions out there. It was one of our first attempts at solving them. It did work for us and I’m happy to share it with you. If you find improvements, please do share it :).

– by Boopati Rajaa, Flipkart Web Team

Progressive Web App: A New Way to Experience Mobile

There have been a few turning points in the history of the web platform that radically changed how web apps were built, deployed and experienced. Ajax was one such pivot that led to a profound shift in web engineering. It allowed web applications to be responsive enough to challenge the conventional desktop apps. However on mobile, the experience was defined by native apps and web apps hardly came close to them, at least until now.  Mobile Engineering team at Flipkart discovered that with right set of capabilities in a browser, a mobile web app can be as performant as a native app.

Thanks to  Extensible Web Manifesto’s efforts to tighten the feedback loop between the editors of web standards and web developers,  browser vendors started introducing new low-level APIs based on the feedback from developers. The advent of these APIs brings unprecedented capabilities to the web. We, at Flipkart, decided to live on this bleeding edge and build a truly powerful and technically advanced web app while working to further evolve these APIs.  

Here’s a sneak peek into how we’ve created an extremely immersive, engaging and high-performance app.

Immersive : While native apps are rich in experience, they do come with a price of an install. While web apps solved the instant access problem, the network connectivity still plays a significant role in defining the web experience. There have been multiple attempts at enabling offline web apps in the past, such as AppCache and using LocalStorage/ IndexedDB. However, these solutions failed to model complex offline use cases described below, making it painful to develop and debug issues. Service Workers replace these approaches by providing a scriptable network proxy in the browser that allows you to handle the requests programmatically. With Service Workers, we can intercept every network request and serve a response from cache even when the user is offline.  

Image1Image2

We chose to use SW-Toolbox, a Service Workers wrapper library that enables using simple patterns such as NetworkFirst, CacheFirst or NetworkOnly. SW-Toolbox provides an LRU cache used in our app for storing previous search results on the browse page and last few visited product pages. The toolbox also has TTL-based cache invalidation mechanism that we use to purge out of date content. Service Workers provides low-level scriptable primitives that make this possible.

Screen Shot 2015-11-09 at 11.28.18 PM

Making the right solution work was as hard as devising it. We faced a wide spectrum of challenges from implementation issues to dev tooling bugs. We are actively collaborating with browser vendors to resolve these challenges.

One such significant challenge that emerged from our use of Service Workers was to build a “kill switch”. It is easy to end up with bugs in Service Workers and stale responses. Having a reliable mechanism to purge all caches has helped us to be proactively ready for any contingencies or surprises.

One more cornerstone of a truly immersive experience is a fullscreen, standalone experience launched right from the home screen. This is what the Add to Home Screen (ATHS) prompt allows us to do. When the user chooses to add to home screen, the browser creates a high-quality icon on the home screen based on the metadata in the Web Manifest. The ATHS prompt is shown automatically to the user based on a heuristic that is specific to each browser. On Chrome, if the user has visited the site twice within a defined period, the prompt will trigger. In the newer Chrome versions, we receive an event once we have matched the heuristic and can show the prompt at a later point in time.

While the heuristic is indispensable to prevent spam on the web platform, we felt it was too conservative and convinced the Chrome team to tweak the heuristic for more commonly occurring scenarios. Based on our feedback, experiments are underway by the Chrome team to shorten the required delay between interactions.

Native apps use splash screen to hide the slow loading of home screen. Web never had this luxury and there was a blank page staring at the user before home screen could load up. Good news is the latest version of Chrome supports generation of a splash screen that radically improves the launch experience and perceived performance of the web app.

splashscreen

Another capability we’re championing is opening external links in the standalone app version rather than in a browser tab. Currently, there is a limitation with Android, but we are working with the Chrome team to enable this use case as soon as possible.  

Engaging: Being able to Re-engage with our users on the web has always been a challenge. With the introduction of the Web Push API, we now have the capability to send Push Notifications to our users, even when the browser is closed. This is possible because of Service Workers, that live beyond the lifetime of the browser.

Notifications

High Performance: A highly performant mobile app is the one that requests less data over the network and takes less time to render.  With a powerful scriptable proxy and persistent cache living in the browser, the data consumption from the network can be reduced significantly. This also helped in reducing the dependency on the network strength and eliminating all latencies on a repeat visit.

Rendering Performance has always has been a challenge for the web. We identified significant improvements in performance when GPU handled rasterization compared to CPU doing it. Hence we decided to leverage GPU rasterization on Chrome (Project Ganesh,  by including the required meta tag in our HTML). At the same time, we have carefully balanced the right number of GPU accelerated composited layers by measuring composition vs. paint costs. Thirdly, we’re using GPU friendly animations namely Opacity and Transform transitions.

Profiling on various mobile devices using Chrome Dev Tools Timeline panel and Chrome Tracing, helped us identify multiple bottlenecks and optimization paths. This helped us make the best of each frame during an animation. We are continuously striving to achieve 60fps animations and interactions. We use the RAIL model for our performance budgets and strive to match and exceed expectations on each of the metrics.

All of this put together, manifested into a stellar experience for our users. It’s been a remarkable journey building this web app, working with browser vendors and pushing the limits of web platform on mobile. Over the coming weeks, we plan to roll out more detailed posts that deep-dive into the technical architectures, patterns and most importantly the lessons learned.

We believe more browser companies and developers will start thinking in these lines and make web apps even better. The web is truly what you make of it, and we have only just begun.

#FlipkartLite

#WeBuildAwesome

Last but not the least,  meet the Flipkart Lite Team  that did the magic— Abhinav Rastogi, Aditya Punjani, Boopathi Raja, Jai Santhosh, Abinash Mohapatra, Nagaraju Epuri, Karan Peri, Bharat KS, Baisampayan Saha, Mohammed Bilal, Ayesha Rana, Suvonil Chatterjee, Akshay Rajwade.

2015-11-05

(wish everyone was in the pic)

Is Data Locality Always Out Of The Box in Hadoop ? Not Really !

Introduction

Hadoop optimizes on Data Locality: Moving compute to data is cheaper than moving data to compute. It can schedule jobs to nodes that are local for input stream, which results in high performance. But, is this really out of the box ? Not always !  This blog explains couple of data locality issues that we identified and fixed.

User Insights Team

Flipkart collects terabytes of user data like browse, search, orders etc. User Insights Team uses these signals and derives useful insights about the user. These can be broadly classified as one of the following

  • Direct Insights Directly derived from user data. e.g. Location,
  • Descriptive Insights Describes the user, his preferences, likes/dislikes, habits, etc. e.g. Brand Affinities, Product Category Affinities, Life-stage
  • Predictive Insights Predict the next behavior or event of a user. e.g. Probability of purchase in session, cancellation/return.

These insights are used to personalize the user experience, give better recommendations and target relevant advertisements.

User Insights
User Insights

HBase as the data store for User Insights

The data collected is stored distributively and processed on Hadoop Cluster. HBase is used for fast lookup of user signals during generation of insights.

Multiple mappers hitting the same HBase region server

At times, full HBase table scans are required. During these scans some mappers finish in less than 10 minutes where as many take hours to finish for approximately same size of HBase regions. We picked the regions which were taking hours to finish and started running them one by one, to our surprise they also finished in under 10 minutes. But when ran with some other regions on the same region server they were slowing down. We started profiling the nodes where the jobs were running very slow and saw that CPU, Memory were fine, but network was choking. We used network profiling tools like nload and iftop to look at the network usage. This showed us that the outbound network traffic was maxing on the bandwidth of the network card and slowing down outbound network transfers. iftop showed us that most of the outbound traffic was from the HBase region server process.

We suspected that the way the input data was split and maps were launched caused too many mappers to hit the same region server. Digging into HBase code which creates the TableInputFormat having TableSplits for HBase scan jobs, found that if the scan range has consecutive regions belonging to the same region server, it will return them in a sequential order and mappers will be launched and allocated in the same order. This will cause all the mappers to hit the same region server and choke the network bandwidth. We wrote the below RoundRobinTableInputFormat which tries to order the splits in a round robin way for each of the used region servers, thereby reducing the probability of the having lot of mappers hitting the same region server.

Code
Gist Link: https://gist.github.com/sudhir-reddy/4eb430d4be9f9f404c38

YARN Fair Scheduler and Data Locality

Even when the cluster was free, we observed that we had 5-10% of data locality for a job. Something seemed suspicious and we started looking at how data locality happens in YARN. Client submits job to YARN Resource Manager(RM), RM allocates a container for running the Application Master(AM) on one of the cluster node. Data to be processed is split into InputSplit for each mapper task. InputSplit has information on where the data is located for the split task. AM asks RM to provide containers for running the mapper tasks preferably on the nodes where the data is located. RM uses the scheduler config to figure out which level of data locality has to be offered to the container request. RM allocates Containers to run on Node Manager(NM) and AM executes the mapper tasks in the allocated containers.

YARN Resource Manager
YARN Resource Manager

We are using FairScheduler in our YARN setup. FairScheduler has configs yarn.scheduler.fair.locality.threshold.node and yarn.scheduler.fair.locality.threshold.rack to tweak data locality. On setting these values to max data locality(=1), we were not able to observe any change in data locality tasks. Digging deeper in to the scheduler code, we found that these values will have no effect without setting the undocumented configs: yarn.scheduler.fair.locality-delay-node-ms and yarn.scheduler.fair.locality-delay-rack-ms. Even on setting these values, not even a single mapper was going to the data local node. Looking further into the scheduler code and container assignment code, figured out that the data local requests will be allocated if the available YARN Node Manager node name and Application Master requesting container for data on a given node name matches. Looking at the YARN node manager console saw that hosts were added with non-FQDN(Fully Qualified Domain Name, eg: hadoop01) and HBase region servers were returning data locality region server names with FQDN(hadoop01.something.flipkart.com). These hosts have non FQDN names in /etc/hostname and kernel(can be seen by `hostname` command) . After updating the hostname to FQDN and running the job again, we were able to get around 97.6% data locality and job’s runtime got reduced by 37% ! Looking for filing a bug on YARN, found that there is one bug already filed and fixed in version YARN 2.1.0. If anyone is using version of YARN less than 2.1.0, should set the hostname to FQDN for getting better data locality.

The above were two instances that cautions us about data locality assumptions in Hadoop and is worth addressing by teams that run large Hadoop based MR workloads.

Foxtrot – Event Analytics at Scale

Health and monitoring of actors participating in a SOA based distributed system like the one we have at Flipkart is critical for production stability. When things go south (and they do), not only do we need to know about the occurrence instantly, but get down to exactly when  and where the problem has happened. This points to the need for us to get access to and act on aggregated metrics as well as raw events that can be sliced and diced at will. With server side apps spanning hundreds of virtual machines and client side apps running on millions of diverse phones this gets amplified. Individual requests need to be tracked across systems, and issues found and reported much before the customers get impacted.

Introducing Foxtrot

Foxtrot is a system built to ingest, store and analyze rich events from multiple systems, providing aggregated metrics as well as the ability to slice, dice and drill down into raw events.

Foxtrot in action

Foxtrot provides a configurable console to get a view of systems; enables people to dig into as well as aggregate raw events across hosts and/or client devices; and provides a familiar SQL based language to interact with the system both from the console as well as from the command line.

A case for event analytics at scale

Let’s consider a few real-life situations:
  • Each day, hundreds of thousands of people place orders at Flipkart through the Checkout system. The system interacts with a bunch of downstream services to make this possible. Any of these services failing in such situations degrade the buying experience considerably. We strive to be on top of such situations and proactively monitor service call rates and response times and quickly stop, debug and re-enable access to the degraded services.
  • Mobile apps need to be faster and efficient both in terms of memory and power. Interestingly, we have very little insight into what is actually happening on the customer’s device as they are interacting with the apps. Getting proper data about how our app affects these devices helps us make our app, faster lighter and more power efficient.
  • Push notifications bring important snippets of information and great offers to millions of users of our apps. It’s of critical importance for us to know what happens to these messages, and control the quality and frequency of these notifications to users.
  • Book publishers push data like book names, authors to us. We process and publish this data to be shown on the apps and the website. We need to monitor these processing systems to ensure things are working properly and quickly catch and fix any problems.

There and many more use-cases currently being satisfied by Foxtrot at Flipkart and Olacabs.

Basic Abstractions

Foxtrot works on the following very simple abstractions:

  • Table – The basic multi-tenancy container used to logically separate data coming in from different systems. So your Foxtrot system might have tables for website, checkout, oms, apps etc. A table in Foxtrot has a TTL and the data does not remain queriable from FQL, Console and JSON queries (see below) once the TTL expires. Data for the table is saved in Hbase and can be used independently by using M/R jobs on hadoop depending on the expiry time specified for the column-family in the table.
  • Document – Most granular data/event representation in Foxtrot. A document is composed of:
    • id – A unique id for the document. This must be provided.
    • timestamp – A timestamp representing the time of generation of this event.
    • data – A json node with fields representing the metadata for the event.
//A sample document
{
    "id": "569b3782-255b-48d7-a53f-7897d605db0b",
   "timestamp": 1401650819000,
    "data": {
        "event": "APP_LOAD",
        "os": "android",
        "device": "XperiaZ",
        "appVersion": {
            "major": "1",
            "minor": "2"
        }
    }
}

Usage

Foxtrot has been designed grounds up for simplicity of use. The following sections detail how events can be pushed to Foxtrot and how they can be summarized, viewed and downloaded.

Pushing data to Foxtrot

Pushing data involves POSTing documents to the single or bulk ingestion API. For more details on ingestion APIs please consult the wiki.

Accessing data from Foxtrot

Once events are ingested into Foxtrot, they can be accessed through any of the many interfaces available.

Foxtrot Console (Builder)

Foxtrot in action

Foxtrot provides a console system with configurable widgets that can be backed by JSON queries. Once configured and saved, these consoles can be shared by url and accessed from the consoles dropdown.

Currently, the following widgets are supported:

  • Pie charts – Show comparative counts of different types for a given time period
  • Bar charts – Show comparative counts of different types for a given time period
  • Histogram – Show count of events bucketed by mins/hours/days over time period
  • Classified histogram – Show count of multiple type of events bucketed by mins/hours/days over time period
  • Numeric Stats histogram – Show stats like max, min, avg and percentiles over numeric fields  bucketed by mins/hours/days over time period
  • Tables – Auto-refreshing tabular data display based on FQL query (see below)

Each of these widgets come with customizable parameters like:

  • Time window of query
  • Filters
  • Whether to show a legend or not etc.

Foxtrot Query Language

A big user of these kinds  of systems  is our analysts, and they really love SQL. We, therefore, support a subset of SQL, that we call as FQL (obviously!!).

Example: The find the count of all app loads and app crashes in the last one hour grouped by operating system:

select * from test where eventType in ('APP_LOAD', 'APP_CRASH') and last('1h') group by eventType, os

Results:
+-------------------------------+
| eventType | os      |   count |
+-------------------------------+
| APP_CRASH | android |   38330 |
| APP_CRASH | ios     |    2888 |
| APP_LOAD  | android | 2749803 |
| APP_LOAD  | ios     |   35380 |
+-------------------------------+

FQL queries can be run from the console as well by accessing the “FQL” tab.

Details about FQL can be found in the wiki.

CSV Downloads

Data can be downloaded from the system by POSTing a FQL query to the /fql/download endpoint.

Json Analytics Interface

This is the simplest and fundamental mode of access of data stored in Foxtrot. Foxtrot provides the /analytics endpoint that can be hit with simple json queries to access and analyze the data that has been ingested into Foxtrot.

Example: The find the count of all app loads crashes in the last one hour grouped by operating system and version:

//Request
{
    "opcode": "group",
    "table": "test",
    "filters": [
        {
            "field": "event",
            "operator": "in",
            "value": "APP_LOAD"
        },
        {
            "operator" : "last",
            "duration" : "1h"
        }
    ],
    "nesting": [
        "os",
        "version"
    ]
}
//Response
{
    "opcode": "group",
    "result": {
        "android": {
            "3.2": 2019,
            "4.3": 299430,
            "4.4": 100000032023
        },
        "ios": {
            "6": 12281,
            "7": 23383773637
        }
    }
}

A list of all analytics and their usage can be found in the wiki.

Raw access

The raw data is available on HBase for access from map-reduce jobs.

Technical Background

Foxtrot was designed and built for scale, with optimizations in both code and query paths to give the fastest possible experience to the user.

Requirements

We set out to build Foxtrot with the following basic requirements:
  • Rapid drill down into events with segmentation on arbitrary fields spanning over a reasonable number of days – This is a big one. Basically that, analysts and users will be able to slice and dice the data based on arbitrary fields in the data. This facility will be provided over data for a reasonable amount of time.
  • Fast ingestion into the system. We absolutely cannot slow down the client system due to it’s wanting to push data to us.
  • Metrics on systems and how they are performing in near real-time. Derived from above, basically aggregations over the events based on arbitrary criteria.
  • Multi-tenancy of clients pushing data to it. Clean abstractions regarding multi-tenancy right down to the storage layers both for query and raw stores, so that multiple-teams can push and view data without them having to worry about stability and/or their data getting corrupted by event stream from other systems
  • Consoles that are easy to build and  share. We needed to have a re-configurable console that teams could customize and share with each other. The console should not buckle under the pressure of millions of events getting ingested and many people having such consoles open to view real-time data on the same.
  • SQL interface to run queries on the data. Our analysts requested us to provide a SQL-ish interface to the system so that they can easily access and analyze the data without getting into the JSON DSL voodoo.
  • CSV downloads of recent data. This, again, was a request from analysts, as they wanted access to the raw events and aggregated results as CSV.
  • Extensibility of the system. We understood that teams will keep on requesting newer analytics and we would not be able to ship everything together at start. We had to plan to support quick development of newer analytics functions without having to touch a lot of the code.

Technology Used

Foxtrot unifies and abstracts a bunch of complex and proven technologies to bring a simple user experience in terms of event aggregation and metrics. Foxtrot uses the following tech stack:

  • HBase – Used as a simple key value store and saves data to be served out for queries and for usage as raw store in long-term map-reduce batch jobs.
  • Elasticsearch – Used as the primary query index. All fields of an event are indexed for the stipulated time and TTLs out. It does not store any document, but row keys only. We provide an optimized mapping in the source tree to the max performance out of elasticsearch.
  • Hazelcast – Used as a distributed caching layer till now, and caches query results for 30 secs. Cache keys are time-dependent and changes with the passage of time. Older data gets TTLd out.
  • SQL parser – For parsing FQL and converting them into Foxtrot JSON queries
  • Bootstrap, Jquery, HTML5, CSS3 –  Used to build the console

Architecture

The Foxtrot system uses a bunch of battle tested technologies to provide a robust and scalable system that meets the various requirements for  teams and allows for development for more types of analytics function on them.

Foxtrot Architecture

  • During ingestion, data is written to Elasticsearch quorum and HBase tables. Writes succeed only when events are written to both stores. HBase is written to before Elasticsearch. Data written to HBase will not become discoverable till it is indexed in Elasticsearch.
  • During query, the following steps are performed:
    • Translate FQL to Foxtrot JSON/native query format
    • Create cache key from the query.
    • Return results if found for this cache key.
    • Otherwise, figure out the time window for the analytics
    • Translate Foxtrot filters to elasticsearch filters
    • Execute the query on Elasticsearch and HBase and cache the results on Hazelcast if the action is cacheable and of reasonable size.

Misc

  • We try to avoid any heavy lifting during document ingestion. We recommend and internally use the /bulk API for event ingestion.
  • We have built in support for discovery of Foxtrot nodes using the /cluster/members API. We use this to distribute the indexing requests onto these different hosts, rather than sending data through a single load-balancer endpoint. We have a Java Client Library that can be embedded and used in client applications for this purpose. The Java client supports both on disk-queue based and async/direct senders for events.
  • We optimize the queries for every analytics by controlling the type of elasticsearch queries and filters being generated. Newer analytics can be easily added to the system by implementing the necessary abstract classes and with proper annotation. The server scans the classpath to pick up newer annotations, so it’s not mandatory for the analytics to be a part of the Foxtrot source tree.
  • In the runtime, the time window is detected and queries routed to indexes that are relevant to the query period. For efficiency reasons always use a time based filter in your queries. Most analytics will automatically add a filter to limit the query. Details for this can be found in individual analytics wiki pages.
  • We have added a simple console (Cluster tab of the console) with relevant parameters that can be used to monitor the health of the elasticsearch cluster.

Conclusion

Foxtrot has been in production for the better part of the year now at Flipkart and has ingested and provided analytics over hundreds of millions of events a day ranging over terabytes of data. Quite a few of our teams depend on this system for their application level metrics, as well as debugging and tracing  requirements.
Foxtrot is being released with Apache 2 License on github.
The source, wiki and issue tracker for Foxtrot is available at: Foxtrot github repository.
We look forward to you taking a look at the project and actively use and contribute to it. Please drop us a note if you find Foxtrot useful. Use github issues to file issues, feature requests. Use pull requests to send us code.
Special thanks to the Mobile API, checkout and notifications team for the support. Many thanks to Regu (@RegunathB) for providing some very important and much needed feedback on the usability of the system and the documentation as a whole and for guiding us meticulously through the process of open-sourcing this.

Aesop Change Propagation System

We have open sourced Aesop, the Flipkart Change Propagation system following the announcement of intent at slash n – the Flipkart tech conference. This post describes use of Aesop in one of our production systems.

PayZippy

PayZippy is an online payment solution launched by Flipkart. It is a Safe & Easy payment solution by Flipkart for e-commerce users and online merchants.

Need for change event propagation

It is fairly common for products and services to use Relational Databases to store business critical data. These data stores therefore become source of truth for such data.
However Relational Databases may not scale well for all kinds of use cases. Use cases include analytics, reporting, search indexing, etc that need this data in secondary data stores. Some of these secondary data stores are non-relational and are efficient at handling such use cases.There is then, a need, for a system to transfer data from the primary data store to these secondary data stores.
A number of tools and products in the ETL (Extract, Transform, and Load) space may be used to transfer data. However these are batched and do not transfer data in real time.
At PayZippy, data from these secondary stores is used to feed more than just business decisions. The data from these secondary data stores feeds into Real-Time use cases like Console, Fraud Detection and Monitoring Systems.
We therefore needed a system that can transfer data across data stores in real time.

Introducing Aesop

Aesop is a keen observer of changes that can also relay change events reliably to interested parties. It provides useful infrastructure for building Eventually Consistent data sources and systems.

Overall Architecture


The main components are the following :
Relay Server

  • Reads changes from Source Data Sources. Converts the changes to a Serializable form by using the schema registry. Stores the events in an internal buffer.
  • Listens to requests from Clients and transport events to the clients. Provides clients with only those events pertaining to sources and partitions for which the client has registered.

Relay Client

  • Single Client : It calls Relay Server to check for new events. On receipt of events from server it executes business logic – for e.g. writing to a destination data store. It checkpoints the SCN. If it falls off the relay, it connect to bootstrap for events. It reconnects back to Relay once it catches up.
  • Participant in Cluster Client : It calls Relay Server for events only for partitions assigned to the client. It process Cluster Status change events from Helix and acts accordingly. It checkpoints SCN to shared storage.

Bootstrap

  • Bootstrap Producer : Similar to a Client. It checks for new data change events on relays and stores those events in a MySQL database. The MySQL database is used for bootstrap and catchup for clients.
  • Bootstrap Server : Similar to Relay Server. It listens for requests from Databus Clients and returns long look-back data change events for bootstrapping and catch up.

High Availability, Load Balancing and Scaling

High Availability, Load Balancing and Scaling is built into Aesop. Since PayZippy is relying on the data moved via Aesop for Real time use cases these were essential to the strategy.

Client Clustering

  • High Availability and Load Balancing on the clients is achieved by having partitioned clients participate in a cluster with each client handling a partition (part of the data set). The partitions are dynamically assigned via Helix. The default partition function partition function used to filter the events is MOD.
  • Partition reassignment when instances join or leave the cluster is automatic and the load (number of partitions assigned) is uniform across instances.
  • The filtering itself is performed at the Relay and Bootstrap. Checkpoint for partitions is stored in Zookeeper. Uses Partitioning functionality available in Linkedin databus.

Relay – HA

High Availability of Relay Server is achieved using one of the following approaches :
Multiple Relay Servers read from the Source Data Sources.

  • The Clients connect to Relay Server via a load balancer.
  • Since the requests from clients are over HTTP one of the Relay Servers or both can be serving the request based on the configuration in the load balancer.
  • When one Relay goes down, the other can still handle requests.

Relay Chaining with HA using Leader Follower model (Not yet implemented)

  • Relay Producer reads from another Relay. This Relay can act as normal Relay for Clients.
  • Relays have both producer, Source Producer and Relay Producers, but only one is active at a time. On Leader Relay the source producer is active. On follower relay the relay producer is active.
  • Leader/Follower election is done using Helix.

Refer Aesop Github Wiki for more information on Aesop architecture and design.

Aesop at PayZippy

Blocking Startup Bootstrap

We needed to transfer existing data from MySql to destination data stores. We assumed we could use the same Relay Server and Client mechanism by pointing the Relay to the first bin log.
However we faced the following issues:

  • The throughput of the client was dependant on that of the destination data store. This turned out to be far less than the Relay. The Relay was twice as fast as the client.
  • The clients would fall off the Relay. The buffer in the Relay does not help as the Relay throughput  is far higher than that of Client and hence the buffer fills up and starts overwriting even before the client has been able to pull the set of events.

Solution : Blocking Startup Bootstrap

  • Thin Producer : A producer similar to producer in Relay that can pull events from the Source or the events are pushed to it from the Source. Processing related to serialization is skipped.
  • Partitioned Consumers : Consumers consume only the particular partition in order and execute business logic. In this case it inserts into destination data store.

Consumers

The consumer currently supported in PayZippy writes to a destination data source – for e.g. denormalized MySQL or HBase. The consumer calls Event Mapper and Transformer to get an Event pertaining to the schema of the destination data source. The transformed event is then written to Destination Data Source.

Monitoring and Alerting

Monitoring and alerting was important as the change propagation system is being used for Real time use cases. It is supported using:

  • JMX Monitor (Soon to be Open Sourced) connects to JVM’s running Aesop. Fetches metrics and publishes to StatsD/Graphite. It also raises alerts based on configured thresholds.
  • Skyline (Changes by Flipkart are not yet Open Sourced) is used to raise alerts based on algorithm configured.
  • Aesop Dashboard – The dashboard provides a real-time view of the change propagation pipeline

Performance

We ran a few benchmarks for Aesop on standard 4 core, 16GB virtual machines running 64-bit Debian Linux. The Aesop components were run on separate VM instances. Performance numbers from the benchmarks:

Relay Server : Handles 19 million events an hour per data source. 18.2 GB per hour.
Relay Client : Can process data at same speed as Server. However it is limited by the capability or speed of end database. For MySql Client we are able to insert/update events at the rate of 10 million events an hour.
Latency : The latency between source data source and destination data source is within 80-90 milliseconds.

Appendix :  Design Decisions

Dual Writes v/s Log Mining

Of the various ways of having data in multiple stores, two were evident.
Dual Writes : Application writes to destination data stores, synchronously or asynchronously. Application can also write to a Publisher-Subscriber system in which the Subscribers are consumers that eventually write to Destination Data stores

  • Pros : Appears Easy : Application can publish the same event that is being inserted/updated in the Primary Data Source.
  • Cons : Difficult to maintain consistency
    • Updates with non-primary-key where clause. For cases where there are bulk updates (Updates with non-primary-key where clause), the application would have to then have to fetch affected records and publish the changes.
    • Difficult to ensure all applications use the common data layer which publishes changes as well.
    • Manual changes in Primary Data Store will be missed.

Log Mining : Separate application/service can extract changes from Database commit logs and publish them. This would use the same approach used by database for replication.

  • Pros : Consistency can be guaranteed as changes are being read from commit logs (bin log in case of MySql).
  • Cons
    • Appears tough – But definitely possible.
    • Tied to mechanism used by database for replication. Tied to commit log format, etc … Tightly coupled approach.

Since Consistency across Datastores is of paramount importance to a financial system like PayZippy we chose the Log Mining approach.

Approaches to Log Mining – Bin Log Parser vs Storage Engine

MySql Bin Log Parsing

  • Pros : Familiar approach
    • Open source softwares were available that parsed MySql bin logs. Open Replicator and Tungsten Replicator
  • Cons
    • If format of bin logs changes the parser would have to change.
    • Open Replicator was supporting MySql version 5.5. We would have to modify Open Replicator to support MySql v5.6 and checksum feature introduced in v5.6.

Custom Storage Engine

  • Pros : Independent of binlog format. Layers above Storage Engine take care of parsing.
  • Cons : Unfamiliar approach. Unknown pitfalls.

We decided to go with known pitfalls and picked Bin Log Parsing approach.

slash n: intelligence @ scale

Let me start this blog with a note of thanks – to you, the engineer! Whether it’s an engineer in San Francisco who open sourced a framework, or an engineer in Beijing who came up with a new technique, or an engineer in Bangalore whose commits increased Hadoop’s performance by 10% – its your work that allows us to focus on solving our problems than being distracted by solving everything under the sun. Flipkart wouldn’t have been here without the support of open source tech community worldwide.

For us, slash n, our flagship technology event, is a celebration of that liberating engineering philosophy of sharing. Building India’s largest e-commerce platform, we’ve learnt a thing or two, created a few pieces of technology, and figured out what techniques/processes work and what don’t. We are committed to share our learnings and to open up our technologies and platforms to the community. slash n is a forum for us to do so and also the forum to learn from the experience of others in the industry, who share the same philosophy about technology.

On Mar 7th, we had the 2nd edition of slash n, and what a day it was! Over 500 engineers participated in the event, out of which half were external invitees. That’s more than double the number of participants we had last year. Considering it was still an invite only event, the interest, enthusiasm and participation shown by the tech community inside and outside of Flipkart was beyond expectations. Unlike last year, when we had most of the talks by Flipkart engineers, this year more than half of the talks were by external speakers from diverse set of organizations. Here are some highlights of the day:

  • The day started with Amod, head of technology at Flipkart, reinforcing the importance of sharing in the technology community, and slash n as our means of doing that. He also committed to open sourcing a few key technology elements at Flipkart over the next 2 quarters, in particular RestBus (messaging system for transactionally consistent service interactions), Aesop (change propagation system) and some of our mobile app technologies.
  • In his keynote, Sachin, founder and CEO of Flipkart, outlined the journey of eCommerce in India, the significant problems that got solved, what challenges lie ahead and how technology can address those challenges in future.
  • This was followed by two talks from very different areas – molecular biology and mobile ad network. Ramesh from Strand Life Sciences talked about how they are using advances in computing to make genetic disease discovery available at sustainable cost to everyone. Rajiv from InMobi talked about their efforts around mining large amount of user data to provide better mobile ads. It was interesting to note that uniquely identifying users across devices, apps & browsers remains the holy grail of personalization on the internet.
  • Some of the most popular talks (based on how many people added them to their schedule using slash n mobile app) included
    • Art of Promising by Yogi & Vikas from Flipkart, which pealed the layers off how Flipkart makes and keeps the availability and SLA promise to customers.
    • Soothsayer @ Flipkart by Ananda and Mohit, which talked about internals of Flipkart’s demand forecasting and inventory planning system.
    • Cataloging talk by Utkarsh and Abhishek from Flipkart, which talked about evolution of our catalog from books to 30+ M items today and how the team addressed the issues around scale, availability and agility along the way.
    • Job Scheduling in Hadoop by Joydeep from Qubole, which provided details on issues around Hadoop job scheduling as well as his experience of building Fair Scheduler and Corona Job Scheduler.
    • Participants loved the newly introduced fire talks – 15-minute quick discussion on a very focused tech topic.
    • Another highlight of the day was a panel discussion on hope, hype and reality of big data analytics, which saw healthy debate among data scientists from diverse organizations like Flipkart, IBM, Xurmo, UIDAI and Mayin, which are trying to use big data analytics to solve problems in different domains.
    • Twitter was abuzz throughout the day with participants tweeting their learnings, questions, discoveries at #slashn.

The atmosphere in the event was quite electric with interesting talks and engaging debates, which often continued between speakers and participants beyond the talk.

IISc Bangalore was the venue of slash n Keynote address by Sachin Bansal

Guest talk by Ramesh Hariharan from Strand Life Sciences Panel discussion on big data analytics

The focus for this year’s event was on ‘Intelligence @ Scale’. The theme encapsulates what we are trying to do from technology perspective at Flipkart and the effort was to share our learnings in this direction and to learn from others. We believe that large scale can become a strategic differentiator if we can use it to make the life of users better continuously. And this can happen when large amount of data generated by the user activities can be used, in real time, to make user experience better via systems that are learning continuously from each user interaction. slash n saw engineers from diverse fields like eCommerce, molecular biology, education, social sciences, mobile ad network, cloud infrastructure, etc. talk about their approaches to build ‘intelligence @ scale’ in their respective domains.

Tech bonding at scale! Full room means good engagement

The day has re-established our core belief that knowledge and technology is meant to be shared and doing so can create virtuous cycle of innovation and progress not only for us but also for the entire ecosystem. I hope everyone who participated had some key takeaways in terms of learning (and a few more connections on your favorite social network) and those who could not, can still watch the recording of all the talks on the event website http://slashn.flipkart.net/

We would like slash n to evolve into a more democratic and open platform to share knowledge and possibly to collaborate on building technologies for future. See you all at next year’s event – let’s celebrate the freedom at a grander scale and collaborate more deeply to solve problems that matter.

GraceKelly: A best effort cache synchronization library for distributed systems

GracKelly is a best effort cache synchronization library for distributed systems.

In the following blog post I’ll explore, the motivations for such a library and give a brief  introduction to GraceKelly.

GraceKelly is open source and is available here: GraceKelly on Github

A Chaotic Place

The average visitor on Flipkart is not aware of the scale at which we operate. Day in and day out we handle millions of page views, thousands of transactions, millions of updates for many things including price, availability, offers, recommendations.  Under the hood of the calm, functional facade of the website, there is a complex network of services that are constantly interacting with each other.

We have services that handle everything ranging from search to product serviceability at a given pin code.

An arbitrary web request that hit’s one of our webservers can spawn a plethora of requests to a bunch of back-end services, which in turn might be dependent on other services.

The back-end services respond with different responses at a variable latency and the responses are collated, made sense of, transformed and finally a web response is created and served to the user as a web-page.
The variability of the requests and responses that traverse the complex network of services while being transformed, multiplexed, demultiplexed and altered makes for a chaotic environment.

Distributed Service Environment

Chaos means unpredictability and unpredictability is bad. When a user requests for a page his page load time must be predictable. When a product goes out of stock, the amount of time it takes to reflect on the product page needs to be predictable. We need predictability around service SLAs. Service SLAs are dependent on the load under which the service is operating. This means, we need predictability around service load as well. We can’t operate in an environment where one minute a single server is able to handle production traffic and the next minute a whole cluster is buckling under the load. So we try to grab and hold on to as much predictability as we can, where ever possible.

Caches to the rescue

Caches act as sentinels in a distributed service environment. Although their primary function is to reduce latency, when used appropriately they excel and bringing predictability to a system. This is because a cache request is extremely predictable, with almost no variability, either in response times or the load per request. This is down to the simple data access pattern for a cache request. If a given front-end request hits caches at all the back-end services we can predict with high confidence the load and response latency of the given request on each service.  One could say that there is positive co-relation between the percentage of Cache hits and the predictability of a system/environment.

Caches To The Rescue

Throwing a spanner in the works

Every time there is a cache miss both our distributed environment and it’s SLAs become a little bit more vulnerable. In the face of these risks a common pattern of cache usage seems inappropriate. One of the most common ways of updating the data stored in caches is to have an expiry ttl for every cache entry. Once this time to live expires the cache entry is removed from the cache and is no longer accessible, until another request repopulates/synchronizes the cache. Using an expiry ttl in this way exposes the underlying system to potentially harmful request pattern load for the duration of synchronization. Imagine a sequence of events like the following

  • t0 – a heavily requested cache entry c1 expires
  • t1 – there is a cache miss for c1 and a request is sent to the service to fulfill
  • t2 – the cache has been repopulated with c2

The time between t1 and t2 is the duration of exposure. During that time all requests for c1 that miss the cache are let through into the distributed environment. The predictability of the target service and all the services it depends on during this time is affected by the the per request load and the qps of all requests that result in a cache miss for c1. Caches could to be updated better than this.

Refresh don’t expire

Refreshing the cache without removing the cache entry solves the problem of exposure that cache expiry brings. In a cache refresh strategy once a value is cached, all requests for the value are served out of the cache and don’t hit the service/services at the back-end. Periodically the cache is synchronized with values from back-end services to keep the data up-to date and consistent with back-end systems. This means for all the values that are cached, the load on the back-end systems is extremely predictable. At the same time the response latencies are highly predictable for these cached values.

Many services/systems would be better served by refreshing the cache rather than expiring it. The efficacy of such a strategy depends on the kind of service in question. For services that have zero tolerance for stale data, best effort refreshing instead of expiring the cache entry doesn’t make sense. However, many services can tolerate stale data to a certain degree. For example, a stock availability service cannot accommodate stale data, while a review and rating service can still have stale data cached for a little while.

There are some popular strategies that are used to implement a refreshing cache.

  1. Global TTL, with a refreshing process: the most common way of implementing a refreshing cache is by running a separate process or thread that periodically refreshes all the entries in the cache. The shortcoming of this strategy is that, it is only appropriate where there is no variability in the staleness of data that is cached. eg: A search engine service’s cache can be refreshed once every 30 minutes if the re-indexing happens only once every 30 minutes.
  2. Fine grained TTL, with a monitoring & refreshing process: In this strategy, a separate process or thread is constantly monitoring the cache entries to see which of them have expired and refreshes them accordingly. This approach gives finer grained control on the cache refresh lifecycle for each cache entry. However, running a separate process means one more component in your environment that needs to be monitored and maintained.

What would be good to have is a cache library with regular caching semantics but one that accommodates refreshing a cache entry rather than expiring it based on ttl. This is exactly what GraceKelly is, it’s inspired by Gooogle Guava’s LoadingCache.

Cache me if you can

GraceKelly is a best effort cache synchronization library that tries it’s best to refresh any cache entry that has expired. The refresh lifecycle is solely request triggered and doesn’t monitor/maintain the cache. This means the refresh is not started when a cache entry expires but rather when the first request for an expired cache entry is made. It is best effort because if synchronization/refresh of a cache entry fails, it can fall back to the stale version of the data already present in the cache.

For every request

  • It looks up the cache and returns the value if a cache entry is present.
  • If the returned cache entry has expired it dispatches a task to refresh the cache entry.
  • If for some reason the refresh fails, it can extend the ttl of the existing entry or do nothing.

Note that a cache entry is never removed(though it can be evicted by size constraints). This enables us to

  • Shield the backend services and systems from exposure to unnecessary request load.
  • Decouple response SLAs from backend degradation and availability concerns, there by allowing for graceful degradation with stale data as fallback.

The Library

GraceKelly the library consists of a single Class Kelly that takes implementations of two different interfaces, a CacheProvider and a CacheLoader. They pass around a generic type CacheEntry.

  • Kelly: This is the core of the library that acts as a proxy to CacheProvider and is responsible for reloading the cache using the CacheLoader.
  • CacheProvider: Interface whose implementation provides the actual caching functionality. eg: a CacheProvider implementation for CouchBase, a CacheProvider wrapper around a ConcurrentHashMap.
  • CacheLoader: Interface whose implementation allows one to reload a CacheEntry based on key and value of the expiring CacheEntry.
  • CacheEntry: Generic type that contains key, value and ttl information.

GraceKelly is open source and is available here with example code and

documentation: GraceKelly on Github

Proxies for resilience and fault tolerance in distributed SOA

On-line Content and Transactions
OLTP systems are characterized by their ability to “respond immediately to user requests“. The common understanding of a transaction in OLTP is within the context of a database where data is read-written with appropriate – albeit varying; durability, integrity and consistency guarantees.
OLTP applications are quite varied and depend largely on domain and purpose. Volume and Variety characteristics of data are different as well. For example, consider these differences among a Banking application, an eCommerce web-site and a Social media portal. The OLTP classification of systems is therefore quite broad, but the basic premise remains : respond immediately to user requests.

Over the years systems have started to embrace BASE over ACID and Eventual Consistency is acceptable. Fundamental assumptions around write-time-consistency are challenged (Eric Brewer on BASE vs ACID) and Availability trumps Consistency over at scale. At Flipkart, web-site availability is treated pretty seriously as the website evolves into a platform for delivering rich, personalized and relevant content to the user.

A typical web page on Flipkart has a good mix of static content (delivered off CDN) and data sourced from a number of backend systems as shown here:
FK.com

Rendering a single such page requires about 2MB of data read/write – comprising Product information, Category tree, User session, Logging and Metrics. The data volume for a single day works out to about 30TB. Delivering this Reliably is hard due to the inescapable consequences of the CAP theorem. However, Responding Immediately to users is Do-Able if loss in Consistency & Data is statistically insignificant.

The Availability Myth
The following listing maps functionality to data stores and protocol services:
website services Evidently, different stores are used and often with good reason – few examples : Category information is structured and is stored in MySQL, User sessions are many and is sharded on Couchbase and MySQL, Search uses Apache Solr as secondary index, Notification data for a user exhibits Temporal Proximity and is stored in HBase, User Recommendations is a keyed lookup on Redis and Metrics is stored in OpenTSDB time series database.

Access to the varied data stores are implemented as SOA services with the primary objective of distribution, de-coupling and interface defined abstraction. Each service cluster has redundant nodes and provides availability guarantee of 99.9% or more.
Running a website that depends on 15 services each with 99.9 % availability, we get

99.9% ^ 15 = 98.5% uptime 
(probability of all services providing 99.9% availability at the same instance of time)

This translates to 2+ hours of downtime per week.In reality, it is generally worse. The cost of running an “always available” service or data store is prohibitively high – accounting for redundancies, backup and near real-time replication of data(strong consistency), seamless failover. Again, this may be attempted with a datastore software that supports all of this and really works!

Latency impacting Availability
Part of the availability problem lies in service invocation. Different styles of service access and its use among developers is depicted in this infographic:

Service access

Services  are often synchronous and the invocation pattern easily translates to making an API call. The API method signature is complete w.r.t data types, method name and errors/exceptions. Service clients handle errors intuitively and at times is forced by the API contract. Consequently  Most of us code to handle Exceptions/Errors, not Latency! 

Handling latencies on the other hand is more involved and requires using techniques like Callbacks and its implementation such as Java Futures. Programming is not straightforward as callbacks don’t compose well – sequencing and combining async. calls is not easy. Moreover, there aren’t many service client libraries that do this transparently.

Another often repeated practice is with regard to measurements where emphasis is on service response Mean and Median times. Variance in response times at the long tail does matter at scale – for example when 10s of servers handle millions of page view requests on a web-site. Consider the Flipkart web-site that uses PHP as the front end – each web server is configured to run a fixed maximum number of concurrent PHP processes and the number of servers is sized by expected load on the website. Consequently, this means resources like CPU, Memory and Processes/Threads are limited/shared and each web-page is served by borrowing, using and immediately returning the shared resource to the pool. Each unit of work is expected to be short-lived and execute in a timely manner. Latency build up – however small and in only a subset of services can impact availability and user experience as shown here:

latency affecting availability

Fault Tolerance – Fail Fast, Recover Quickly
The engineering team at Flipkart built resilience into the website technology stack by having it deal with imminent failures in upstream services. The fk-w3-agent aka W3-agent daemon was already being used successfully to scale PHP and get around some of its limitations (See slide no. 77 onwards in this presentation : How Flipkart scales PHP). A detailed presentation on the evolution of the Flipkart web-site architecture is available here : Flipkart architecture : Mistakes & Leanings.
The W3-agent was redesigned to be a high performance RPC system that could serve as a transparent service proxy. Few design principles for this new system were:

  • Prevent cascading failures – Fail fast and Recover quickly
  • Provide Reasonable fallbacks around failures – the exact behavior can be service specific
  • Support for multiple protocols and codecs in order to enable transparent proxying – Unix Domain Sockets, TCP/IP and Http, Thrift
  • High performance runtime with low overhead – ability for a single local instance to handle hundreds of millions of API/Service calls per day

The fail fast and fallback behavior is entirely functional and implemented as alternate path logic by the respective service owner. The invocation of primary vs alternate path flow is at the discretion of the service proxy.

The Flipkart Phantom

Proxy servers & processes are used extensively as intermediaries for requests from clients seeking resources from other servers. There are different types of proxies and one specific type – the Reverse Proxy can hide the existence of origin servers, where requests from clients and  responses from servers are relayed back-and-forth in a transparent manner. The proxy also offers a runtime for implementing routing or highly localized business logic – for example executing a custom expression to sort data elements returned by the service response.

Phantom is a high performance proxy for accessing distributed services. It is an RPC system with support for different transports and protocols. Phantom is inspired by Twitter Finagle and builds on the capabilities of technologies like Netty, Unix Domain Sockets, Netflix Hystrix and Trooper (Spring).

This design diagram depicts logical layering of the Phantom tech stack and technologies used:
Phantom tech stack
The layer abstraction in the design helps to:

  • Support incoming requests using a number of protocols and transports. New ones (say UDP) may be added as needed. Mixing different incoming(e.g Http) and outgoing (e.g. Thrift) transports are also supported.
  • Create protocol specific codecs – e.g. Http, Thrift. Adding a new Thrift proxy end-point requires only configuration edits, no code change needed.
  • Automatic wrapping of API calls with Hystrix commands with reasonable defaults for Thread/Semaphore isolation and Thread pools. Users of Phantom are not required to program to the Hystrix API and focus on implementing service calls and fallback behavior. Fallback behavior is influenced by configured parameters(timeouts, thread pool size) and real time statistics comprising latent requests, thread-pool rejections and failure counts (see Hystrix’s support for this : How Hystrix works)
  • Define an API layer for calling services. This is optional and promotes request-response data driven interfaces.

Phantom is open source and available here : Phantom on Github

Phantom proxies have been used to serve hundreds of millions of API calls in production deployments at Flipkart. More than 1 billion Thread/Semaphore isolated API and service calls are executed on Phantom everyday. The proxy processes were monitored and found to incur a marginal increase in Resource utilization while response times remained same at various percentiles measured.
Phantom deployment

Responding immediately to user requests – redefining the user experience
Proxies like Phantom provide the technical infrastructure for shielding an application from latencies in upstream services in a distributed SOA. The proxies are transparent to service clients & services and therefore non-intrusive. Fallback behavior for each service however, needs to be implemented by service owners. Also, recovering from failed transactions (if required at all) is outside the scope of Phantom. For example, email campaign hits are stored in a database and the fallback behavior in case of database failure is to append this data to logs. Recovery of data from logs and appending to the database is an operational activity implemented outside Phantom. Another example is displaying product information where Phantom fails over to a local cache cluster if the Product Catalog Management System is down. This behavior can result in issues related to consistency – price changes & stock availability changes may not reflect. The application i.e web-site and the end business processes (fulfillment of orders placed based on cache data) will need to change to redefine the user experience.

Making Deliveries Faster – the Flipkart cache cluster

by Vinod V, Vivek Y S & Regunath B.

Flipkart website has grown exponentially over the last few months, if one were to go by Alexa rankings and our own internal metrics for tracking web-site traffic. While this is heartening and good for the business, it also places an unstated expectation – that of sustained quality of service at large scale.

The Flipkart motto of ‘Service, Selection and Price’, in that order, reflects in multiple things we do. The Flipkart service experience begins at the website and is treated pretty seriously. In the first of a blog series under the title “Making Deliveries Faster”, we’ll talk about the Flipkart Cache Cluster – one of the key infrastructures that helps sustain the Flipkart website experience at scale.

This blog post has three parts.  The first talks about use-cases that necessitate a cache and derive criteria for its selection.  The second talks about internals of the chosen solution (membase).  The third talks about nuances in our deployment.

Part I – Why cache?

Larry Page once said : “Browsing should be as simple and fast as turning a page in a magazine”.  A key metric that defines an on-line shopping experience is the user perceived response time. Note here the reference to user perception. Here is a great reference to timing attributes.

Response time for a page request can be broadly broken into the following stages:

  1. Time spent on the network
  2. Time spent by the application in preparing data for the page – application security, reading data from persistence, transformation, applying business logic, data preparation for rendering etc.
  3. Rendering the page on the client browser

As one may see, latencies introduced in any of these stages adversely affect user-perceived response times.

e-Commerce sites are rich in content and the Flipkart pages are no exception. But serving these pages is not straightforward for a variety of reasons:

  • Content for a page is personalized to improve relevance, hence needs to be dynamic
  • Pages are composed using shared elements.  For example, the product details of a fast-moving item may be shown in multiple places – the best-sellers widget, the Search Page and the Shopping Cart page.
  • A number of product attributes are subject to change – stock/availability, recent reviews, price.

All product attributes need to be read from persistent store at runtime. This persistent store needs to provide lowest possible latencies at scale – for millions of products and across millions of requests.  And that’s where caching comes in.

Another use case worth discussing is that of User sessions. Websites use sessions to keep track of a number of things – details of the logged in Principal and activities performed by the user on the website, including but not limited to the shopping cart. This session data too can grow into millions pretty quickly, especially if the retention period is a few days. Low latency access to session data is a need in application security and data preparation by the application for generating a web-page response.

Most websites start off storing and serving these data from their primary persistence stores, which is often a RDBMS database. This is a very viable choice for reasons around strong data consistency and durability guarantees (ACID properties). However, over time, this infrastructure tends to become the most stressed, difficult to scale (rather, distribute) and gradually degrade into an often occurring single point of failure.

The difficulty in scaling almost all distributed data stores is nicely characterized by the CAP theorem. The Product Catalog and User Session use cases described above may be re-looked in the context of CAP and the data system characteristics re-defined as follows:

  • Highly Available data store where downtimes affect a section of users, at most. Better if this can be further localized to say Geographic location, User profile, Data stored and Web-pages.
  • Low Latency, High Throughput reads i.e. Serve millions of requests in tens of milli-seconds.
  • Guaranteed Writes – preferably to redundant copies, else, Eventual Consistency.
  • Application-selectable guarantees on read-consistency with application driven reconciliation across versions if required.

The Flipkart use cases of Product Catalog and User Sessions diverge somewhat in the requirements around consistency especially in user experience when deployments are heavily distributed- multiple data centers connected by WAN say.

It should also be fairly obvious that individual and cumulative data sizes are not very big, that when distributed across machines, can fit into memory. Data stores that leverage primary memory have least latencies –  as they effectively avoid disk seeks and often disk I/O. A memory-based cache infrastructure therefore was deemed as the most viable option with Availability, Durability and Consistency guarantees becoming additional selection criteria.

Part II – Membase

This part concentrates on the internals of memcache (a high-performing and very leveraged caching solution at Flipkart), its short comings and how membase solves it. Also covers a few architectural patterns that are used in memcache/membase that may be relevant in other places.

Typically memcache servers are organized as farm of many servers. Each memcache server is owner of a sub set of the key space. Now either applications are aware of this ownership-server mapping or a client library can achieve the same. Either way keys are sharded and key value requests are sent to the right server.  Here are some key characteristics of memcache.

Constant time operations

  • Most of the memcache operations are of constant time. They are independent of the number of keys or the size of they keys that stored.

Least Recently Used

  • Memcache server is started with a pre-determined amount of space. As long as all of the key/values and its metadata fits in that pre-determined memory, memcache server goes on allocating memory for the key value pairs. But once all of the pre-determined memory is used, memcahe starts to evict the key values from the memory to allocate space for new key values pairs. The memcache uses LRU policy to evict the keys.
  • Internally it uses a counter, which is a timestamp of last access time, to figure out the key to evict.

Memory allocation

  • A single memcache server can easily saturate a 1Gbps network link. It can easily perform 100K operations (gets and puts) per second. Ideally this should result in a terrible memory fragmentation. But surprisingly this does not happen on memcache servers.
  • Memcache is able to achieve this by having its own memory management system. It is called slab allocation. It is very similar to slab allocation in linux kernel. Let me explain.
  • When memcache server is started with very verbose option we see something like this
slab class 1: chunk size 80 per slab 13107
slab class 2: chunk size 100 per slab 10485
slab class 3: chunk size 128 per slab 8192
slab class 4: chunk size 160 perslab 6553
  • When memcache server is started, it partitions all of the allocated memory in to pages assigned to a slab class. Each page is of 1MB size, which coincides with the max size of a key value pair. Each page is divided into chunks of same size. There can be multiple pages with the same chunk size. But once a page is assigned to a slab class it cannot be re-assigned. For example there can be 10 pages with slab class 3. ie with chunk size 128 bytes.
  • The smallest chunk size starts from 80 bytes and increases with a factor of 1.25 rounded to the next power of 2.

Lets take a hypothetical scenario and see how memory allocation happens in memcache.

  1. Start the memcache server with 100MB of space. It  requests 100MB of heap space from the OS and starts to manage it. It starts creating 1 page of each slab class. The remaining memory is reserved with out partitioning.
  2. When client starts inserting key value pairs to the server, it uses the above created slabs to allocate.
  3. Once all the chunks in a particular slab are used, another page is created from the unused memory and assigned to that particular slab class.
  4. One of the interesting consequence of allocating space like this is on the working of LRU algorithm. Lets take another hypothetical scenario to understand this.
  5. Just like previous scenario, lets start the memcache server with 100MB space. It initializes the memory.
  6. Say the client starts to insert only 128 bytes key value pairs until all of 100MB is used up.
  7. Then client inserts a key value pair with 1MB. It uses the already created page of the slab class with 1MB chunk size.
  8. If the client inserts another 1MB key value pair, then memcache evicts the previously inserted 1MB key value pair. Even though there are many 128 bytes key value pair that were inserted before 1MB key value pair.
  9. From the above scenario it is clear that LRU is applied per slab class

Some of the major short comings of memcache for our use cases:

  • Horizontal scaling – Whenever a new machine needs to be added for horizontal scaling, key ownership changes unless applications or client libraries are using consistent hashing.
  • Availability – When a memcache server crashes or not available due to n/w partition, cache is not available. A way to solve this problem is by replication. But when replication is introduced another challenge that arises is consistency.
  • Persistence – This is not a design goal for memcache.

Membase took a different approach to solve all of the above 3 problems with single solution (called vBucket). Before describing the internals of vBucket, lets start with some of the design goals membase aimed to achieve:

  • Never service a request on the wrong server.
  • Allow scaling up and down at will.
  • Servers refuse commands that they should not service, but
  • Servers still do not know about each other.
  • We can hand data sets from one server to another atomically, but
  • There are no temporal constraints.
  • Consistency is guaranteed.
  • Absolutely no network overhead is introduced in the normal case.

To achieve above design goals, Membase has cluster management server called ns_server (north scale server) written in Erlang.

Now, back to vBuckets.  A vBucket is conceptually a computed subset of all possible keys.


One could think of it as 2-level hashing with first level of hashing computed dynamically and second level mapped statically. The number of vBuckets in the cluster remains constant and it is independent of the cluster topology.  This means a key say x maps always to same vBucket as long as the same hash function is used.

This is a nice architectural pattern to achieve the effects of consistent hashing, without implementing a consistent hashing algorithm.

There are few terms that needs to be defined before vBucket implementation can be explained.
Cluster : A collection of membase servers.
Server : An individual machine within the cluster.
vbucket : A subset of all possible keys.

Also a vBucket can be in any one of the following states:
Active : This server is servicing all the request for this vBucket
Dead : This server is not responsible for this vbucket.
Replica : Receives only replication commands but does not server any clients.
Pending : This server will block all request for this vBucket.

Let us look at the vBucket state transitions and how horizontal scaling is achieved by this.


Initially membase cluster contains a single server. Typically the number of vBuckets in a deployment vary b/w 1024 to 4096. The max number of vBuckets can be 65536.  For the sake of simplicity let us take 6 vBuckets.

 

 

 

Now lets add another server. So there is one active server & another new server.

Adding a new server will not unbalance the tree. All the vBuckets in the new server comes up in the dead state.

 

 

 

In order to make the new server usable, rebalance of the vbucket map has to be done. The rebalance process transfers the vBucket from old server to new server. This is done by an asynchronous thread. The rebalance process selects a subset of vBuckets that the new server should serve and set them to pending state. Then data is sent from the old server and placed into new server. Once the transfer is complete the state of the vBucket is set to active on the new server and dead on the old server.

These are the exact order in which the operations are performed.

  • The vBucket on the new server is placed in pending state.
  • A vBucket extract tap stream is started.
  • The vBucket tap stream atomically sets the state to dead when the queue is in a sufficient drain state.
  • The new server only transitions from pending to active state after it receives confirmation that old server is no longer servicing the requests.

By performing the operations in the exact order one can guarantee no more than one server is active for any given vBucket at any given point in time without any regard to actual chronology. (This is a major difference between data stores like Riak and Membase. Riak uses vector clock to achieve the same result).

You may notice that there is a very short time period where a vBucket has no active server at all. This occurs at the very end of the transfer mechanism and causes blocking to occur.  Client rarely notices this.

High availability in membase is achieved by replication. When replication is enabled, and a new server is added, some of the vbucket state is changed to replica. Replica vbucket is like dead vBucket from client perspective. It does not serve the clients. But it listens to replication commands.

If a server crashes, vBucket to server static map is changed. The vBucket owned by the crashed servers are activated on the other servers in the cluster. Those vBuckets in replica state are changed to active state so that clients can be served. This change happens so quick that controlled fail overs will never result in client request failures.

Finally it is clear that vBucket solves two of the three short comings of memcache (availability and horizontal scaling).

Persistency in membase is an extension to memcache. Memcache has a module called engine interface. Using engine interface storage engine can be implemented.  The complete RFC can be found here.

One of the best part of engine interface is, it completely asynchronous and memcache can be started with any storage engine that implements engine interface. Currently Membase uses SQLite as the storage engine. With couchbase 2.0 this will be replaced with couchdb’s storage engine.

Part III – Nuances of Membase use at Flipkart

The Flipkart systems differ slightly in their usage of the Membase cache cluster. The difference is primarily around detecting and explicitly handling data consistencies between sources of data, values in cache and data as seen by clients. One such instance (Product Catalog service) is highlighted below:

  • Use the Flipkart cache wrapper library (Prometheus) to perform all cache operations. Prometheus was primarily written to support connection pooling.
  • Use increment/decrement methods to implement atomic counters which allows different backend servers with versioning.
  • Use add (put if absent) and cas (compare and set) operations which allows for lists and set semantics shared across multiple clients.

Shifting gears to one of the interesting properties that was observed on our production cluster was cache miss on Membase buckets when the particular bucket was using about 70% of the allocated memory. The problem was there was no free memory on the OS. OS was caching the IO objects.

To circumvent this problem a simple solution was adopted. Reduced the swapiness of the linux virtual memory system and regularly flush the OS cache.

Acknowledgment
Most of the content about the internals of Membase are based on memcache and membase source code, documentation from couchbase and finally blog posts from couchbase developers.