Implement streaming
This article is part of a tutorial about implementing Datafeed API. We recommend that you follow the guide from the start.
At this stage, you will implement real-time data updates via WebSocket. You will know how to:
- Connect to streaming and unsubscribe from it.
- Subscribe for data updates and handle them.
Step 1. Connect to streaming
To connect your datafeed to the streaming API:
Add the
socket.io
script to theindex.html
page. Note that you need to place it before themain.js
script./chart‑project/index.html<head>
<!-- ... -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.5.4/socket.io.js"></script>
<!-- ... -->
</head>Create a new file called
streaming.js
, where you will implement a connection to WebSocket./chart‑project/src/streaming.jsconst socket = io('wss://streamer.cryptocompare.com');
socket.on('connect', () => {
console.log('[socket] Connected');
});
socket.on('disconnect', (reason) => {
console.log('[socket] Disconnected:', reason);
});
socket.on('error', (error) => {
console.log('[socket] Error:', error);
});
export function subscribeOnStream() {
// To Do
}
export function unsubscribeFromStream() {
// To Do
}Import the functions from
streaming.js
intodatafeed.js
:/chart‑project/src/datafeed.jsimport { subscribeOnStream, unsubscribeFromStream } from './streaming.js';
To subscribe for real-time data updates for a symbol, implement
subscribeBars
. the library calls it every time the chart symbol or resolution is changed, or when the chart needs to subscribe to a new symbol./chart‑project/src/datafeed.js// ...
// Use it to keep a record of the most recent bar on the chart
const lastBarsCache = new Map();
// ...
export default {
// ...
subscribeBars: (
symbolInfo,
resolution,
onRealtimeCallback,
subscriberUID,
onResetCacheNeededCallback
) => {
console.log('[subscribeBars]: Method call with subscriberUID:', subscriberUID);
subscribeOnStream(
symbolInfo,
resolution,
onRealtimeCallback,
subscriberUID,
onResetCacheNeededCallback,
lastBarsCache.get(symbolInfo.full_name)
);
},
};Implement
unsubscribeBars
to stop receiving updates for the symbol when a user selects another symbol on the chart./chart‑project/src/datafeed.jsunsubscribeBars: (subscriberUID) => {
console.log('[unsubscribeBars]: Method call with subscriberUID:', subscriberUID);
unsubscribeFromStream(subscriberUID);
},
Step 2. Subscribe for updates
On the previous step, you connected your datafeed to WebSocket. Now, you need to subscribe to the channels to receive updates:
Import
parseFullSymbol
fromhelpers.js
intostreaming.js
./chart‑project/src/streaming.jsimport { parseFullSymbol } from './helpers.js';
Implement
subscribeOnStream
to subscribe for updates./chart‑project/src/streaming.js// ...
const channelToSubscription = new Map();
// ...
export function subscribeOnStream(
symbolInfo,
resolution,
onRealtimeCallback,
subscriberUID,
onResetCacheNeededCallback,
lastDailyBar
)
{
const parsedSymbol = parseFullSymbol(symbolInfo.full_name);
const channelString = `0~${parsedSymbol.exchange}~${parsedSymbol.fromSymbol}~${parsedSymbol.toSymbol}`;
const handler = {
id: subscriberUID,
callback: onRealtimeCallback,
};
let subscriptionItem = channelToSubscription.get(channelString);
if (subscriptionItem) {
// Already subscribed to the channel, use the existing subscription
subscriptionItem.handlers.push(handler);
return;
}
subscriptionItem = {
subscriberUID,
resolution,
lastDailyBar,
handlers: [handler],
};
channelToSubscription.set(channelString, subscriptionItem);
console.log('[subscribeBars]: Subscribe to streaming. Channel:', channelString);
socket.emit('SubAdd', { subs: [channelString] });
}
Step 3. Unsubscribe from streaming
Implement unsubscribeFromStream
to unsubscribe from streaming:
export function unsubscribeFromStream(subscriberUID) {
// Find a subscription with id === subscriberUID
for (const channelString of channelToSubscription.keys()) {
const subscriptionItem = channelToSubscription.get(channelString);
const handlerIndex = subscriptionItem.handlers
.findIndex(handler => handler.id === subscriberUID);
if (handlerIndex !== -1) {
// Remove from handlers
subscriptionItem.handlers.splice(handlerIndex, 1);
if (subscriptionItem.handlers.length === 0) {
// Unsubscribe from the channel if it is the last handler
console.log('[unsubscribeBars]: Unsubscribe from streaming. Channel:', channelString);
socket.emit('SubRemove', { subs: [channelString] });
channelToSubscription.delete(channelString);
break;
}
}
}
}
Step 4. Handle updates
CryptoCompare might require an API key to request some of their stream data. At the time of writing this tutorial, no key is required, and the code provided still works. If you get an error message, visit the CryptoCompare WebSockets documentation for more information.
The responses for requests look like this:
0~Bitfinex~BTC~USD~2~335394436~1548837377~0.36~3504.1~1261.4759999999999~1f
To handle updates coming from the WebSocket:
Implement the following function in
streaming.js
:/chart‑project/src/streaming.js// ...
socket.on('m', data => {
console.log('[socket] Message:', data);
const [
eventTypeStr,
exchange,
fromSymbol,
toSymbol,
,
,
tradeTimeStr,
,
tradePriceStr,
] = data.split('~');
if (parseInt(eventTypeStr) !== 0) {
// Skip all non-trading events
return;
}
const tradePrice = parseFloat(tradePriceStr);
const tradeTime = parseInt(tradeTimeStr);
const channelString = `0~${exchange}~${fromSymbol}~${toSymbol}`;
const subscriptionItem = channelToSubscription.get(channelString);
if (subscriptionItem === undefined) {
return;
}
const lastDailyBar = subscriptionItem.lastDailyBar;
let bar = {
...lastDailyBar,
high: Math.max(lastDailyBar.high, tradePrice),
low: Math.min(lastDailyBar.low, tradePrice),
close: tradePrice,
};
console.log('[socket] Update the latest bar by price', tradePrice);
subscriptionItem.lastDailyBar = bar;
// Send data to every subscriber of that symbol
subscriptionItem.handlers.forEach(handler => handler.callback(bar));
});Adjust the
getBars
method indatafeed.js
to save the last bar data for the current symbol./chart‑project/src/datafeed.js//...
data.Data.forEach( ... );
if (firstDataRequest) {
lastBarsCache.set(symbolInfo.full_name, { ...bars[bars.length - 1] });
}
console.log(`[getBars]: returned ${bars.length} bar(s)`);
// ...Add
getNextDailyBarTime
function tostreaming.js
./chart‑project/src/streaming.jsfunction getNextDailyBarTime(barTime) {
const date = new Date(barTime * 1000);
date.setDate(date.getDate() + 1);
return date.getTime() / 1000;
}CryptoCompare API provides a streaming of ticks, not bars. So, you need to check that the new trade is related to the new daily bar.
Note that you might need a more comprehensive check for the production version.
Adjust the
socket.on
listener instreaming.js
.socket.on('m', data => {
//...
const lastDailyBar = subscriptionItem.lastDailyBar;
const nextDailyBarTime = getNextDailyBarTime(lastDailyBar.time);
let bar;
if (tradeTime >= nextDailyBarTime) {
bar = {
time: nextDailyBarTime,
open: tradePrice,
high: tradePrice,
low: tradePrice,
close: tradePrice,
};
console.log('[socket] Generate new bar', bar);
} else {
bar = {
...lastDailyBar,
high: Math.max(lastDailyBar.high, tradePrice),
low: Math.min(lastDailyBar.low, tradePrice),
close: tradePrice,
};
console.log('[socket] Update the latest bar by price', tradePrice);
}
subscriptionItem.lastDailyBar = bar;
//...
});
Result
🎉 Congrats! At this point, you have implemented a datafeed with searching and resolving symbols, loading historical data, and providing real-time data updates via WebSocket.
Now you can run npx serve
from the chart-project
folder (if you have not already done it before)
and check how the implementation works.
Complete code
Click the following sections to reveal the complete code for the examples at this stage of the tutorial.
index.html
datafeed.js
streaming.js
What's next?
We hope this tutorial helped you understand the essentials of the Datafeed API and how to work with it. If you want to dive deeper into the details, we recommend checking out the following articles:
- Widget Constructor: get a better understanding of the Widget Constructor's capabilities and settings.
- Datafeed Common Issues: explore common issues that you might face when implementing the Datafeed API.
- Customization Overview: learn how to customize UI elements and chart behavior.