Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions packages/firebase_data_connect/firebase_data_connect/.metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# This file tracks properties of this Flutter project.
# Used by Flutter tool to assess capabilities and perform upgrades etc.
#
# This file should be version controlled and should not be manually edited.

version:
revision: "adc901062556672b4138e18a4dc62a4be8f4b3c2"
channel: "stable"

project_type: app

# Tracks metadata for the flutter migrate command
migration:
platforms:
- platform: root
create_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2
base_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2
- platform: macos
create_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2
base_revision: adc901062556672b4138e18a4dc62a4be8f4b3c2

# User provided section

# List of Local paths (relative to this file) that should be
# ignored by the migrate tool.
#
# Files that are not part of the templates will be ignored by default.
unmanaged_files:
- 'lib/main.dart'
- 'ios/Runner.xcodeproj/project.pbxproj'
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include: package:flutter_lints/flutter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,11 @@ class EntityNode {
srcListMap.forEach((key, value) {
List<EntityNode> enodeList = [];
List<dynamic> jsonList = value as List<dynamic>;
jsonList.forEach((jsonObj) {
for (var jsonObj in jsonList) {
Map<String, dynamic> jmap = jsonObj as Map<String, dynamic>;
EntityNode en = EntityNode.fromJson(jmap, cacheProvider);
enodeList.add(en);
});
}
objLists?[key] = enodeList;
});
}
Expand Down Expand Up @@ -367,9 +367,9 @@ class EntityNode {
if (nestedObjectLists != null) {
nestedObjectLists!.forEach((key, edoList) {
List<Map<String, dynamic>> jsonList = [];
edoList.forEach((edo) {
for (var edo in edoList) {
jsonList.add(edo.toJson(mode: mode));
});
}
jsonData[key] = jsonList;
});
}
Expand All @@ -396,9 +396,9 @@ class EntityNode {
Map<String, dynamic> nestedObjectListsJson = {};
nestedObjectLists!.forEach((key, edoList) {
List<Map<String, dynamic>> jsonList = [];
edoList.forEach((edo) {
for (var edo in edoList) {
jsonList.add(edo.toJson(mode: mode));
});
}
nestedObjectListsJson[key] = jsonList;
});
jsonData[listsKey] = nestedObjectListsJson;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,13 @@ abstract class DataConnectTransport {
Variables? vars,
String? token,
);

/// Invokes corresponding stream query endpoint.
Stream<ServerResponse> invokeStreamQuery<Data, Variables>(
String queryName,
Deserializer<Data> deserializer,
Serializer<Variables> serializer,
Variables? vars,
String? token,
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ class DataConnectError extends FirebaseException {

/// Error thrown when an operation is partially successful.
class DataConnectOperationError<T> extends DataConnectError {
DataConnectOperationError(
DataConnectErrorCode code, String message, this.response)
: super(code, message);
DataConnectOperationError(super.code, String super.message, this.response);
final DataConnectOperationFailureResponse<T> response;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 Google LLC
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,15 +53,43 @@ abstract class OperationRef<Data, Variables> {
);
Variables? variables;
String operationName;
DataConnectTransport _transport;
final DataConnectTransport _transport;
Deserializer<Data> deserializer;
Serializer<Variables> serializer;
String? _lastToken;

FirebaseDataConnect dataConnect;

Future<OperationResult<Data, Variables>> execute(
{QueryFetchPolicy fetchPolicy = QueryFetchPolicy.preferCache});
static dynamic _sortKeys(dynamic value) {
if (value is Map) {
final sortedMap = <String, dynamic>{};
final sortedKeys = value.keys.toList()..sort();
for (final key in sortedKeys) {
sortedMap[key.toString()] = _sortKeys(value[key]);
}
return sortedMap;
} else if (value is List) {
return value.map(_sortKeys).toList();
}
return value;
}

static String createOperationId<Variables>(String operationName,
Variables? vars, Serializer<Variables>? serializer) {
if (vars != null && serializer != null) {
try {
final decoded = jsonDecode(serializer(vars));
final sortedStr = jsonEncode(_sortKeys(decoded));
return '$operationName::$sortedStr';
} catch (_) {
return '$operationName::${serializer(vars)}';
}
} else {
return operationName;
}
}

Future<OperationResult<Data, Variables>> execute();

Future<bool> _shouldRetry() async {
String? newToken;
Expand Down Expand Up @@ -184,15 +212,6 @@ class QueryManager {
return streamController;
}

static String createQueryId<QueryVariables>(String queryName,
QueryVariables? vars, Serializer<QueryVariables> varSerializer) {
if (vars != null) {
return '$queryName::${varSerializer(vars)}';
} else {
return queryName;
}
}

void dispose() {
_impactedQueriesSubscription?.cancel();
}
Expand All @@ -216,7 +235,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
variables,
);

QueryManager _queryManager;
final QueryManager _queryManager;

@override
Future<QueryResult<Data, Variables>> execute(
Expand All @@ -240,7 +259,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
}

String get _queryId =>
QueryManager.createQueryId(operationName, variables, serializer);
OperationRef.createOperationId(operationName, variables, serializer);

Future<QueryResult<Data, Variables>> _executeFromCache(
QueryFetchPolicy fetchPolicy) async {
Expand Down Expand Up @@ -311,9 +330,58 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
Stream<QueryResult<Data, Variables>> subscribe() {
_streamController ??= _queryManager.addQuery(this);

execute();
final stream =
_streamController!.stream.cast<QueryResult<Data, Variables>>();

// Return the stream to the caller, then execute fetches
Future.microtask(() {
if (dataConnect.cacheManager != null) {
_executeFromCache(QueryFetchPolicy.cacheOnly)
.then((_) {})
.catchError((err) {
log("Error fetching from cache during subscribe $err");
// Ignore cache misses here, server stream will provide latest data
});
}

// Initiate Web Socket stream
_streamFromServer();
});

return stream;
}

void _streamFromServer() async {
bool shouldRetry = await _shouldRetry();
try {
final stream = _transport.invokeStreamQuery<Data, Variables>(
operationName,
deserializer,
serializer,
variables,
_lastToken,
);

await for (final serverResponse in stream) {
if (dataConnect.cacheManager != null) {
await dataConnect.cacheManager!.update(_queryId, serverResponse);
}
Data typedData = _convertBodyJsonToData(serverResponse.data);

return _streamController!.stream.cast<QueryResult<Data, Variables>>();
QueryResult<Data, Variables> res =
QueryResult(dataConnect, typedData, DataSource.server, this);
publishResultToStream(res);
}
} on DataConnectError catch (e) {
if (shouldRetry &&
e.code == DataConnectErrorCode.unauthorized.toString()) {
_streamFromServer();
} else {
publishErrorToStream(e);
}
} catch (e) {
publishErrorToStream(e as Error);
}
}

void publishResultToStream(QueryResult<Data, Variables> result) {
Expand All @@ -322,7 +390,7 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {
}
}

void publishErrorToStream(Error err) {
void publishErrorToStream(Object err) {
if (_streamController != null) {
_streamController?.addError(err);
}
Expand All @@ -331,24 +399,16 @@ class QueryRef<Data, Variables> extends OperationRef<Data, Variables> {

class MutationRef<Data, Variables> extends OperationRef<Data, Variables> {
MutationRef(
FirebaseDataConnect dataConnect,
String operationName,
DataConnectTransport transport,
Deserializer<Data> deserializer,
Serializer<Variables> serializer,
Variables? variables,
) : super(
dataConnect,
operationName,
transport,
deserializer,
serializer,
variables,
);
super.dataConnect,
super.operationName,
super.transport,
super.deserializer,
super.serializer,
super.variables,
);

@override
Future<OperationResult<Data, Variables>> execute(
{QueryFetchPolicy fetchPolicy = QueryFetchPolicy.serverOnly}) async {
Future<OperationResult<Data, Variables>> execute() async {
bool shouldRetry = await _shouldRetry();
try {
// Logic below is duplicated due to the fact that `executeOperation` returns
Expand Down
Loading
Loading