This content originally appeared on DEV Community and was authored by Twilight
This article guides you through creating a WebSocket client in Flutter for real-time data. We’ll start with the basics and improve it step by step until it’s solid and reliable. We’re using Binance Market WebSocket as an example (think of it like a live crypto price feed), but the real focus is WebSocket—connecting, fixing errors, and managing data streams. Let’s dive in!
Part 1: Starting with a Basic Connection
First, we need a way to connect to a WebSocket server. Since real-time data can come from different tools (like WebSocket, MQTT,...), we define a StreamingService interface to keep things flexible. But here, we’ll use WebSocket with the web_socket_channel package, connecting to Binance’s endpoint (wss://stream.binance.com:9443/ws).
Here’s the simple starting code:
abstract class StreamingService {
void connect();
}
class WebsocketService implements StreamingService {
WebsocketService({required this.wsUrl});
final String wsUrl;
WebSocketChannel? _channel;
@override
void connect() {
if (_channel != null) return;
_channel = WebSocketChannel.connect(Uri.parse(wsUrl));
_channel?.stream.listen(
_onEvent,
onError: _onError,
onDone: _onDone,
);
}
void _onEvent(dynamic event) {
print("Event: $event");
}
void _onError(dynamic error) {
print("Error: $error");
}
void _onDone() {
print("Connection closed");
}
}
This code connects and listens for data. _onEvent prints incoming messages, _onError logs errors, and _onDone notes when the connection stops. It’s simple—just enough to see Binance data flowing—but it’s fragile. If the connection drops or fails, it won’t recover. Let’s fix that next.
Part 2: Keeping the Connection Alive with Ping-Pong
The basic setup from Part 1 has a weakness: WebSocket connections can die if they’re idle too long. Binance helps us here by sending a “ping” message to check if we’re still alive. We’ll reply with a “pong” to keep the connection going—a bit like a heartbeat.
To improve this, we’ll:
- Check incoming events for a
pingusingdecodedEvent.containsKey('ping'). - Send a
pongresponse.
Here’s the updated code:
class WebsocketService implements StreamingService {
/* Same as before */
void _onEvent(dynamic event) {
if (event is! String) return;
final decodedEvent = jsonDecode(event);
if (decodedEvent is Map<String, dynamic>) {
if (decodedEvent.containsKey('ping')) {
_handlePingEvent();
return;
}
}
print("Event: $event");
}
void _handlePingEvent() {
_channel?.sink.add(jsonEncode({'pong': DateTime.now().millisecondsSinceEpoch}));
}
}
What’s new?
-
jsonDecode(event)turns the raw message into a map we can check. -
containsKey('ping')spots Binance’s ping message. -
_handlePingResponse()sends a pong with a timestamp, like saying, “Hey, I’m still here!”
This is smart—it keeps the connection alive without extra work. But if it drops anyway, we’re still stuck. Let’s add reconnect logic next.
Part 3: Adding Reconnect Logic
Part 2 keeps the connection alive during quiet times, but what if the network fails or Binance cuts us off? The old code just logs “Error” or “Connection closed” and gives up. We need it to fight back by reconnecting.
Here’s how we’ll improve it:
- Add
isConnectedto check the connection status. - Use
_reconnectAttemptsto avoid infinite loop calls. - Check error types in
_onErrorand close codes in_onDoneto decide when to reconnect. - Create
reconnect()anddisconnect()functions for control.
Here’s the code:
class WebsocketService implements StreamingService {
/* Same as before */
int _reconnectAttempts = 0;
bool get isConnected => _channel != null && _channel!.closeCode == null;
void _onError(dynamic error) {
if (error is WebSocketChannelException || error is SocketException) {
print("Temporary error detected");
reconnect();
} else if (error is HttpException && error.message.contains("403")) {
print("Access denied (403 Forbidden). Check API permissions.");
} else if (error is HttpException && error.message.contains("400")) {
print("Bad request (400). Check the request format.");
} else {
print("Unknown error: $error");
}
}
void _onDone() {
print("Connection closed: ${_channel?.closeReason ?? 'Unknown reason'}");
if (_channel?.closeCode == null || _channel?.closeCode != 1000) {
reconnect();
}
}
void reconnect() async {
disconnect();
if (_reconnectAttempts >= 5) {
print("Maximum reconnection attempts reached");
return;
}
print("Attempting to reconnect (${_reconnectAttempts + 1}/5)...");
_reconnectAttempts++;
await Future.delayed(Duration(seconds: 2 * _reconnectAttempts));
connect();
}
void disconnect() {
_channel?.sink.close();
_channel = null;
}
}
What’s new and clever?
-
isConnecteduses_channel!.closeCode == nullto tell if we’re live—simple but effective. -
_onErrorchecks error types:WebSocketChannelExceptionorSocketExceptionmeans a glitch we can retry, while403or400means stop and fix something. -
_onDoneonly reconnects ifcloseCode != 1000(1000 means normal closure). -
reconnect()waits longer each try and stops after 5 attempts—smart pacing!
Now it’s tough, but we can’t talk to Binance yet. Let’s add requests.
Part 4: Sending Requests
So far, we only listen. Let’s sending requests to subscribe to Binance’s Stream (Example: Individual Symbol Mini Ticker Stream)
Improvements:
- Use
SubscribeStreamRequestandUnsubscribeStreamRequestclasses for clean code. - Pipe data to a
StreamControllerwithEventTypesfor organization. - Add
_sendRequestto send messages.
Here’s the code:
class WebsocketService implements StreamingService {
/* Same as before */
final _symbolMiniTickerStreamController = StreamController<SymbolMiniTickerEvent>.broadcast();
void _onEvent(dynamic event) {
if (event is! String) return;
final decodedEvent = jsonDecode(event);
if (decodedEvent is Map<String, dynamic>) {
if (decodedEvent.containsKey('ping')) {
_handlePingResponse();
return;
}
final eventType = decodedEvent['e'];
switch (eventType) {
case EventTypes.symbolMiniTicker:
_symbolMiniTickerStreamController.sink.add(SymbolMiniTickerEvent.fromMap(decodedEvent));
break;
}
}
}
Future<bool> _sendRequest(Map<String, dynamic> request) async {
if (!isConnected) {
print("WebSocket is not connected. Cannot send request: $request");
return false;
}
_channel?.sink.add(jsonEncode(request));
return true; // Assume success
}
/// =============================== Individual Symbol Mini Ticker Streams ===============================
Stream<SymbolMiniTickerEvent> get symbolMiniTickerStream => _symbolMiniTickerStreamController.stream;
Future<bool> subscribeSymbolMiniTickerStream({required List<String> symbols}) async {
final request = SubscribeStreamRequest(
streamNames: symbols.map((symbol) => "${symbol.toLowerCase()}@miniTicker").toList(),
);
return _sendRequest(request.toMap());
}
Future<bool> unsubscribeSymbolMiniTickerStream({required List<String> symbols}) async {
final request = UnsubscribeStreamRequest(
streamNames: symbols.map((symbol) => "${symbol.toLowerCase()}@miniTicker").toList(),
);
return _sendRequest(request.toMap());
}
}
class EventTypes {
static const String symbolMiniTicker = '24hrMiniTicker';
}
class SubscribeStreamRequest {
SubscribeStreamRequest({required this.streamNames});
final List<String> streamNames;
Map<String, dynamic> toMap() {
return {
"id": DateTime.now().millisecondsSinceEpoch,
"method": "SUBSCRIBE",
"params": streamNames,
};
}
}
class UnsubscribeStreamRequest {}
What’s new?
-
_sendRequestsends messages like{"method": "SUBSCRIBE", "params": ["btcusdt@miniTicker"]}—it assumes success for now. -
subscribeSymbolMiniTickerStreamandunsubscribeSymbolMiniTickerStreamuse clean classes—look how readable"${symbol.toLowerCase()}@miniTicker"is :v -
_symbolMiniTickerStreamController(aStreamController) lets us stream data to the app. -
EventTypeskeeps event names organized.
It’s cool, but _sendRequest doesn’t confirm if it worked, and reconnects lose subscriptions. Let’s make it smarter.
Part 5: Making Requests Smarter with ACK and Stream Management
Part 4 sends requests, but it’s blind—we don’t know if Binance accepted them, and reconnects don’t resubscribe. Let’s fix that with:
- Add
_pendingRequeststo track request success via ACKs. - Add
_activeStreamsto remember active stream subscriptions. - Add
_handleAckResponseto process responses. - Resubscribe in
reconnect()and reset states indisconnect().
class WebsocketService implements StreamingService {
/* Same as before */
final Map<int, Completer<bool>> _pendingRequests = {};
final Set<String> _activeStreams = {};
void _onEvent(dynamic event) {
if (event is! String) return;
final decodedEvent = jsonDecode(event);
if (decodedEvent is Map<String, dynamic>) {
if (decodedEvent.containsKey('ping')) {
_handlePingResponse();
return;
}
if (decodedEvent.containsKey('id')) {
_handleAckResponse(decodedEvent);
return;
}
final eventType = decodedEvent['e'];
switch (eventType) {
case EventTypes.symbolMiniTicker:
_symbolMiniTickerStreamController.sink.add(SymbolMiniTickerEvent.fromMap(decodedEvent));
break;
}
}
}
void _handleAckResponse(Map<String, dynamic> response) {
final requestId = response['id'];
final completer = _pendingRequests.remove(requestId);
if (completer != null) {
if (response.containsKey('code')) {
completer.complete(false);
print("Error: ${response['msg']} (Code: ${response['code']})");
} else if (response.containsKey('result')) {
completer.complete(response['result'] == null);
} else {
completer.complete(false);
}
}
}
Future<bool> _sendRequest(Map<String, dynamic> request) async {
if (!isConnected) {
print("WebSocket is not connected. Cannot send request: $request");
return false;
}
final completer = Completer<bool>();
_pendingRequests[request['id']] = completer;
_channel?.sink.add(jsonEncode(request));
final isSuccess = await completer.future;
if (isSuccess && request.containsKey('params')) {
final streamNames = request['params'];
if (request['method'] == 'SUBSCRIBE') {
_activeStreams.addAll(streamNames);
} else if (request['method'] == 'UNSUBSCRIBE') {
_activeStreams.removeAll(streamNames);
}
}
return isSuccess;
}
void reconnect() async {
/* Same as before */
if (isConnected) {
_reconnectAttempts = 0;
if (_activeStreams.isNotEmpty) {
final request = SubscribeStreamRequest(streamNames: _activeStreams.toList());
await _sendRequest(request.toMap());
}
}
}
void disconnect() {
for (final completer in _pendingRequests.values) {
completer.complete(false);
}
_pendingRequests.clear();
_channel?.sink.close();
_channel = null;
}
void close() {
disconnect();
_symbolMiniTickerStreamController.close();
}
}
What’s new?
-
_pendingRequeststracks requests withCompleter<bool>— it waits for Binance’s ACK like “Hey, did this work?” -
_handleAckResponsechecksresponse['result'] == nullfor success (Binance’s trick) -
_activeStreamsremembers"btcusdt@miniTicker"and updates withaddAllorremoveAll. -
reconnect()resubscribes with_activeStreams.isNotEmpty— no more lost streams! -
disconnectcancels allCompleterin _pendingRequests to avoid resubscribing to streams that are no longer needed. -
close()shuts down_symbolMiniTickerStreamController—no memory leaks, brilliant!
Wrapping Up
We’ve built a WebSocket client from scratch—starting with a basic connection, adding reconnect logic, keeping it alive with ping-pong, sending requests, and making them smart with ACKs and stream management. Binance Market WebSocket was just our example; the real star is the WebSocket setup. Use this code, tweak it for your needs, and handle real-time data like a pro!
This content originally appeared on DEV Community and was authored by Twilight
Twilight | Sciencx (2025-02-22T09:59:33+00:00) Building a WebSocket Client in Flutter: From Zero to Hero. Retrieved from https://www.scien.cx/2025/02/22/building-a-websocket-client-in-flutter-from-zero-to-hero/
Please log in to upload a file.
There are no updates yet.
Click the Upload button above to add an update.