How to build a scalable streaming data pipeline for financial big data
The concepts and solutions described in this article were developed for BidFlow Technologies‘ software ecosystem and are deployed in production.
Trading the financial markets systematically with a purely data-driven approach requires a powerful data science environment with access to a flexible, scalable, unified source of historical financial data. As financial decisions are made purely on data – and since simulating / stress testing a systematic trading system’s historical performance is the only way to estimate its potential future performance – historical financial data has to be of immaculate quality. The more data is diverging from the real past due to data loss, duplicate data, numerical errors and data inconsistency, the less expressive performance simulations and research results become. This can have significant negative impact on live trading operations. Risk might be misinterpreted which in turn can cause systems to be taken live that add no or even negative value.
High-quality historical financial data offers a direct competitive advantage in the systematic trading business, but its big data qualities make it both costly to manage and store as well as very expensive to buy from external sources. To mitigate a part of these expenses, a custom, self-managed data lake can be built by recording and storing real-time financial data through a scalable streaming data pipeline. Such a streaming data pipeline got implemented at BidFlow Technologies and enabled us to integrate an ever-growing collection of high-quality historical financial data (currently at the size of hundreds of gigabytes) in our proprietary data science environment.
This post discusses the details of designing and building a scalable streaming data pipeline with AWS. The informative, very well written post Collect and distribute high-resolution crypto market data with ECS, S3, Athena, Lambda, and AWS Data Exchange on AWS Big Data Blog inspired us with fundamental ideas and is a recommended read for everyone who is interested in big data solutions.
High-level concept
Data streams
Nowadays, exchanges, brokers and data vendors provide real-time financial data via data streams to which consumers can subscribe. But, in the world of finance, underlying technology stacks differ vastly from business to business. Data streams from various sources can not be consumed in a unified way as separate streams often require different subscription protocols (e.g. FIX, WebSocket, Lightstreamer) and work with custom APIs. In addition, financial data can come in many different forms (high variety). Hence, each data stream ingested in the streaming data pipeline requires specific treatment.
Data ingestion
The required specific treatment of each third-party data stream prevents the use of a static, unified data ingester. Therefore an Amazon ECS cluster is chosen as the basic layer. Data ingesters are developed and configured per data stream (flexibility) and then deployed in the cluster next to each other while the underlying computational resources are controlled and scaled based on current resource requirements (scalability). Logs are written to Amazon CloudWatch to provide in-depth insights into the data ingesters. If required, alerts can be configured for special events.
Data normalization
Next to connecting to data streams, the data ingesters also handle data normalization. The raw data records are transformed with internally defined mappings to streamline data variety. Exemplary code snippets below show in which shape raw financial data is arriving and how its normalized counterpart looks like.
BID: 35328.9 OFR: 35418.9 LTV: 1 UTM: 1698930198693 DAY_OPEN_MID: 35416.9 DAY_NET_CHG_MID: -43 DAY_PERC_CHG_MID: -0.12 DAY_HIGH: 35999.7 DAY_LOW: 35032.7
serverTimestamp: 1698930198693 clientTimestamp: 1698930198701 bidPrice: 35328.9 askPrice: 35418.9 lastTradedPrice: null lastTradedVolume: 1 incrementalTradingVolume: null dayOpenMid: 35416.9 dayNetChangeMid: -43.0 dayPercentChangeMid: -0.12 dayHigh: 35999.7 dayLow: 35032.7
Another example: the trading platform Deribit offers both futures and options on cryptocurrency. The shape of raw futures and options ticks is similar but not identical. In this case they are mapped to the same unified shape.
{ "best_ask_amount": 80980, "best_ask_price": 35406.5, "best_bid_amount": 1460, "best_bid_price": 35406, "current_funding": 0.00032169, "estimated_delivery_price": 35386.52, "funding_8h": 0.00043324, "index_price": 35386.52, "instrument_name": "BTC-PERPETUAL", "interest_value": 0.10242901739473433, "last_price": 35406, "mark_price": 35406.75, "max_price": 35937.79, "min_price": 34875.59, "open_interest": 732616360, "settlement_price": 35320.3, "state": "open", "stats": { "high": 36030, "low": 34102, "price_change": 2.9244, "volume": 24605.14153068, "volume_notional": 863326930, "volume_usd": 863326930 }, "timestamp": 1698922852775 }
serverTimestamp: 1698922852775 clientTimestamp: 1698922852792 indexPrice: 35386.52 orderBookState: "open" markPrice: 35406.75 markImpliedVolatility: null bidPrice: 35406.0 bidAmount: 1460.0 bidImpliedVolatility: null askPrice: 35406.5 askAmount: 80980.0 askImpliedVolatility: null lastTradedPrice: 35406.0 minPrice: 34875.59 maxPrice: 35937.79 settlementPrice: 35320.3 estimatedDeliveryPrice: 35386.52 deliveryPrice: null underlyingIndex: null underlyingPrice: null currentFunding: 0.00032169 funding8h: 0.00043324 openInterest: 732616360.0 volume24h: 24605.14153068 volumeUsd24h: 863326930.0 pricePercentChange24h: 2.9244 high24h: 36030.0 low24h: 34102.0 delta: null gamma: null vega: null theta: null rho: null interestRate: 0.10242901739473433
{ "ask_iv": 55.33, "best_ask_amount": 19, "best_ask_price": 0.033, "best_bid_amount": 0.5, "best_bid_price": 0.0325, "bid_iv": 54.48, "estimated_delivery_price": 35402.48, "greeks": { "delta": 0.52191, "gamma": 0.00014, "rho": 3.75352, "theta": -72.15906, "vega": 20.79957 }, "index_price": 35402.48, "instrument_name": "BTC-10NOV23-35500-C", "interest_rate": 0, "last_price": 0.0345, "mark_iv": 54.66, "mark_price": 0.0326, "max_price": 0.081, "min_price": 0.0046, "open_interest": 148.5, "settlement_price": 0.03138989, "state": "open", "stats": { "high": 0.04, "low": 0.019, "price_change": 72.5, "volume": 65.9, "volume_usd": 70539.76 }, "timestamp": 1698922588675, "underlying_index": "BTC-10NOV23", "underlying_price": 35542.24 }
serverTimestamp: 1698922588675 clientTimestamp: 1698922588690 indexPrice: 35402.48 orderBookState: "open" markPrice: 0.0326 markImpliedVolatility: 54.66 bidPrice: 0.0325 bidAmount: 0.5 bidImpliedVolatility: 54.48 askPrice: 0.033 askAmount: 19.0 askImpliedVolatility: 55.33 lastTradedPrice: 0.0345 minPrice: 0.0046 maxPrice: 0.081 settlementPrice: 0.03138989 estimatedDeliveryPrice: 35402.48 deliveryPrice: null underlyingIndex: "BTC-10NOV23" underlyingPrice: 35542.24 currentFunding: null funding8h: null openInterest: 148.5 volume24h: 65.9 volumeUsd24h: 70539.76 pricePercentChange24h: 72.5 high24h: 0.04 low24h: 0.019 delta: 0.52191 gamma: 0.00014 vega: 20.79957 theta: -72.15906 rho: 3.75352 interestRate: 0.0
Overall, extra effort is spent to make sure that data of the same type is mapped to eponymous records across different data streams (e.g. current bid and ask prices to “bidPrice” and “askPrice”).
Data storage
At this point, data ingestion and data normalization can be handled on a flexible and scalable basis. The last stage in the streaming data pipeline is data retention and structuring to ultimately make a growing data lake available to data science environments. For this, the data storage use case as well as the various features of different cloud storage solutions have to be thoroughly analyzed. It is strongly advised to ensure that prospective data storage requirements can highly likely be met with the chosen cloud storage to prevent a cumbersome data lake migration to another, better fitting solution in the future. Let’s have a look at the characteristics of financial big data to make an informed decision:
An important property most financial data has in common is that it is time-sensitive. Different readings at different points in time form time series, with the latest reading always being the most relevant one. For this kind of data the Comma-Separated Values (CSV) file format is convenient. It can be used to store time series data row-wise, sorted by point in time. CSV files can also have an arbitrary number of columns (though constant within each file) which make them a good fit with regard to the high variety in financial data. Also, they integrate well with data science packages like pandas for Python. For storing financial time series this way, Amazon’s object storage Amazon S3 is a simple yet effective solution and supports powerful scalability.
Alternatively, a NoSQL database solution like Amazon DynamoDB could be used, storing streamed data in real-time and also supporting high data variety as well as throughput and storage scaling. In that case, the data would not have to be stored in files and would be directly inserted in the database.
However, another important factor is cost. Financial data often arrives in high frequency (especially if it is real-time price data which can potentially generate hundreds of data points per second). This makes a NoSQL database a very expensive solution as it is charged by throughput or per CRUD operation. Amazon S3 is charged per PUT operation as well, but this solution allows for buffering data points in memory and writing them to S3 in chunks. It therefore gives direct control over the cost factor. The longer data is buffered in memory, the more significant a data ingester crash becomes as more in-memory data is lost – but cost decreases as data is written to S3 less frequently. The shorter data is buffered in memory, the less significant a data ingester crash becomes as less in-memory data is lost – but cost increases as data is written to S3 more frequently.
Hence, with Amazon S3 it is possible to get the best from both worlds: high cost-efficiency as well as high data durability and scaling. Amazon S3 also supports powerful analytics services (like Amazon Athena) and is therefore chosen as the data storage solution for the streaming data pipeline.
Data structuring
A consistent and sensible structure of the data lake is necessary to perform frictionless data refinement and big data analytics on it. In addition, the data lake structure has to be scalable such that arbitrary new data can be retained using the existing structure. To meet these requirements, the following object prefix template is chosen:
.../provider=<provider>/type=<type>/instrument=<instrument>/year=<year>/month=<month>/day=<day>/<filename>
provider: the data provider (e.g. “IG”, “DERIBIT”)
type: the data type (e.g. “quotes”, “rates”)
instrument: the traded instrument or exact type of data (e.g. “CS.D.BITCOIN.CFE.IP”, “BTC-10APR23-25500-C”, “ESTR”, “SOFR”)
year: the year of the data (e.g. “2022”, “2023”)
month: the month of the data (e.g. “4”, “11”)
day: the day of the data (e.g. “1”, “12”)
filename: the filename (e.g. “IG_quotes_CS.D.BITCOIN.CFE.IP_2023_11_12.csv.gz”, “DERIBIT_quotes_BTC-11OCT23-29000-C_2023_10_11.csv.gz”)
The prefix template utilizes a notation which is understood and used by Amazon Athena to partition the data lake. Data partitioning in Athena dynamically restricts the amount of data scanned per query and is therefore improving performance and reducing cost. An example of a data lake object using the described prefix template is shown below.
Data refinement
At this point the streaming data pipeline is complete and handles data streaming as well as normalization and retention in a scalable and cost-effective way. However, as we live in an imperfect world, the streaming data pipeline is subject to connection loss (to ingested third-party data streams) as well as data ingester crashes and cloud service malfunctions. All these failures cause minor or even major losses of streamed real-time data and data recovery can become really tedious or even impossible (depending on the interfaces third-party APIs offer).
To reduce impact of these problems and to minimize data loss, the streaming data pipeline runs redundantly as a primary and replica instance. The data ingesters of these instances run in explicitly defined different availability zones. This setup results in respectively two different but very similar or even identical data streams being streamed into Amazon S3. The pictures below show chunks of data retained by the primary and replica streaming data pipeline instances. Pay attention to the source identifier in the object prefixes which is either primary or replica.
The pictures also reveal that, for this particular instrument, data chunks are buffered and stored in minute intervals. The chunks’ filenames consist of the first data point’s and the last data point’s millisecond timestamp within each chunk.
To produce refined daily chunks of data like anticipated in the previous section, the chunks from both the primary and replica data stream have to be coalesced. This procedure fills data gaps if they solely appear in one of both data streams and also deduplicates redundant data points, forming chunks of data with potentially higher data completeness.
To realize this functionality, Amazon ECS tasks are run on a daily basis which scan the last day’s retained data and utilize Amazon Athena to generate said refined data chunks. That way, new data is added to the data lake once per day. To increase cost and storage efficiency, all streamed data chunks from both the primary and replica streaming data pipeline are set to expire (be permanently deleted) after a month. By then, the data already got processed and added to the data lake in its final form.
The exemplary Amazon Athena query below shows how Athena can be leveraged to execute the rather complex procedure of coalescing primary and replica data streams in a single query.
SELECT coalesce(p.serverTimestamp, r.serverTimestamp) AS serverTimestamp, coalesce(p.clientTimestamp, r.clientTimestamp) AS clientTimestamp, coalesce(p.bidPrice, r.bidPrice) AS bidPrice, coalesce(p.askPrice, r.askPrice) AS askPrice, coalesce(p.lastTradedPrice, r.lastTradedPrice) AS lastTradedPrice, coalesce(p.lastTradedVolume, r.lastTradedVolume) AS lastTradedVolume, coalesce(p.incrementalTradingVolume, r.incrementalTradingVolume) AS incrementalTradingVolume, coalesce(p.dayOpenMid, r.dayOpenMid) AS dayOpenMid, coalesce(p.dayNetChangeMid, r.dayNetChangeMid) AS dayNetChangeMid, coalesce(p.dayPercentChangeMid, r.dayPercentChangeMid) AS dayPercentChangeMid, coalesce(p.dayHigh, r.dayHigh) AS dayHigh, coalesce(p.dayLow, r.dayLow) AS dayLow FROM ( SELECT serverTimestamp, clientTimestamp, bidPrice, askPrice, lastTradedPrice, lastTradedVolume, incrementalTradingVolume, dayOpenMid, dayNetChangeMid, dayPercentChangeMid, dayHigh, dayLow FROM "<redacted>_primary"."ig_quotes" WHERE instrument = 'CS.D.BITCOIN.CFE.IP' AND year = 2023 AND month = 11 AND day = 12 ) AS p FULL JOIN ( SELECT serverTimestamp, clientTimestamp, bidPrice, askPrice, lastTradedPrice, lastTradedVolume, incrementalTradingVolume, dayOpenMid, dayNetChangeMid, dayPercentChangeMid, dayHigh, dayLow FROM "<redacted>_replica"."ig_quotes" WHERE instrument = 'CS.D.BITCOIN.CFE.IP' AND year = 2023 AND month = 11 AND day = 12 ) AS r ON p.serverTimestamp = r.serverTimestamp ORDER BY serverTimestamp ASC
Conclusion
This post gave a bird’s-eye view on how a cloud-based streaming data pipeline for financial big data can be implemented, making sure that it is scalable in the future with very few technical constraints. For this, every pipeline stage, from frictionless data ingestion to durable data retention, was discussed and workable concepts were presented. Based on these, a real-world version of the streaming data pipeline got implemented at BidFlow Technologies and is running robustly in production, proving the quality of the presented solution.