Line data Source code
1 : /*
2 : * Famedly Matrix SDK
3 : * Copyright (C) 2019, 2020, 2021 Famedly GmbH
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU Affero General Public License as
7 : * published by the Free Software Foundation, either version 3 of the
8 : * License, or (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU Affero General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU Affero General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : import 'dart:async';
20 : import 'dart:convert';
21 :
22 : import 'package:collection/collection.dart';
23 :
24 : import 'package:matrix/matrix.dart';
25 : import 'package:matrix/src/models/timeline_chunk.dart';
26 :
27 : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
28 : /// automatically. The initial
29 : /// event list will be retreived when created by the `room.getTimeline()` method.
30 :
31 : class Timeline {
32 : final Room room;
33 39 : List<Event> get events => chunk.events;
34 :
35 : /// Map of event ID to map of type to set of aggregated events
36 : final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
37 :
38 : final void Function()? onUpdate;
39 : final void Function(int index)? onChange;
40 : final void Function(int index)? onInsert;
41 : final void Function(int index)? onRemove;
42 : final void Function()? onNewEvent;
43 :
44 : StreamSubscription<Event>? timelineSub;
45 : StreamSubscription<Event>? historySub;
46 : StreamSubscription<SyncUpdate>? roomSub;
47 : StreamSubscription<String>? sessionIdReceivedSub;
48 : StreamSubscription<String>? cancelSendEventSub;
49 : bool isRequestingHistory = false;
50 : bool isRequestingFuture = false;
51 :
52 : bool allowNewEvent = true;
53 : bool isFragmentedTimeline = false;
54 :
55 : final Map<String, Event> _eventCache = {};
56 :
57 : TimelineChunk chunk;
58 :
59 : /// Searches for the event in this timeline. If not
60 : /// found, requests from the server. Requested events
61 : /// are cached.
62 4 : Future<Event?> getEventById(String id) async {
63 8 : for (final event in events) {
64 8 : if (event.eventId == id) return event;
65 : }
66 4 : if (_eventCache.containsKey(id)) return _eventCache[id];
67 4 : final requestedEvent = await room.getEventById(id);
68 : if (requestedEvent == null) return null;
69 4 : _eventCache[id] = requestedEvent;
70 4 : return _eventCache[id];
71 : }
72 :
73 : // When fetching history, we will collect them into the `_historyUpdates` set
74 : // first, and then only process all events at once, once we have the full history.
75 : // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
76 : // even if /sync's complete while history is being proccessed.
77 : bool _collectHistoryUpdates = false;
78 :
79 : // We confirmed, that there are no more events to load from the database.
80 : bool _fetchedAllDatabaseEvents = false;
81 :
82 2 : bool get canRequestHistory {
83 4 : if (events.isEmpty) return true;
84 1 : return !_fetchedAllDatabaseEvents ||
85 6 : (room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
86 : }
87 :
88 : /// Request more previous events from the server. [historyCount] defines how many events should
89 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
90 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
91 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
92 : /// true by default, but this can be overridden.
93 : /// This method does not return a value.
94 4 : Future<void> requestHistory({
95 : int historyCount = Room.defaultHistoryCount,
96 : StateFilter? filter,
97 : }) async {
98 4 : if (isRequestingHistory) {
99 : return;
100 : }
101 :
102 4 : isRequestingHistory = true;
103 4 : await _requestEvents(
104 : direction: Direction.b,
105 : historyCount: historyCount,
106 : filter: filter,
107 : );
108 4 : isRequestingHistory = false;
109 : }
110 :
111 0 : bool get canRequestFuture => !allowNewEvent;
112 :
113 : /// Request more future events from the server. [historyCount] defines how many events should
114 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
115 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
116 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
117 : /// true by default, but this can be overridden.
118 : /// This method does not return a value.
119 1 : Future<void> requestFuture({
120 : int historyCount = Room.defaultHistoryCount,
121 : StateFilter? filter,
122 : }) async {
123 1 : if (allowNewEvent) {
124 : return; // we shouldn't force to add new events if they will autatically be added
125 : }
126 :
127 1 : if (isRequestingFuture) return;
128 1 : isRequestingFuture = true;
129 1 : await _requestEvents(
130 : direction: Direction.f,
131 : historyCount: historyCount,
132 : filter: filter,
133 : );
134 1 : isRequestingFuture = false;
135 : }
136 :
137 5 : Future<void> _requestEvents({
138 : int historyCount = Room.defaultHistoryCount,
139 : required Direction direction,
140 : StateFilter? filter,
141 : }) async {
142 6 : onUpdate?.call();
143 :
144 : try {
145 : // Look up for events in the database first. With fragmented view, we should delete the database cache
146 5 : final eventsFromStore = isFragmentedTimeline
147 : ? null
148 16 : : await room.client.database.getEventList(
149 4 : room,
150 8 : start: events.length,
151 : limit: historyCount,
152 : );
153 :
154 4 : if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
155 2 : for (final e in eventsFromStore) {
156 1 : addAggregatedEvent(e);
157 : }
158 : // Fetch all users from database we have got here.
159 2 : for (final event in events) {
160 3 : if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
161 : continue;
162 : }
163 : final dbUser =
164 6 : await room.client.database.getUser(event.senderId, room);
165 0 : if (dbUser != null) room.setState(dbUser);
166 : }
167 :
168 1 : if (direction == Direction.b) {
169 2 : events.addAll(eventsFromStore);
170 4 : final startIndex = events.length - eventsFromStore.length;
171 2 : final endIndex = events.length;
172 2 : for (var i = startIndex; i < endIndex; i++) {
173 1 : onInsert?.call(i);
174 : }
175 : } else {
176 0 : events.insertAll(0, eventsFromStore);
177 0 : final startIndex = eventsFromStore.length;
178 : final endIndex = 0;
179 0 : for (var i = startIndex; i > endIndex; i--) {
180 0 : onInsert?.call(i);
181 : }
182 : }
183 : } else {
184 5 : _fetchedAllDatabaseEvents = true;
185 10 : Logs().i('No more events found in the store. Request from server...');
186 :
187 5 : if (isFragmentedTimeline) {
188 1 : await getRoomEvents(
189 : historyCount: historyCount,
190 : direction: direction,
191 : filter: filter,
192 : );
193 : } else {
194 8 : if (room.prev_batch == null) {
195 0 : Logs().i('No more events to request from server...');
196 : } else {
197 8 : await room.requestHistory(
198 : historyCount: historyCount,
199 : direction: direction,
200 4 : onHistoryReceived: () {
201 4 : _collectHistoryUpdates = true;
202 : },
203 : filter: filter,
204 : );
205 : }
206 : }
207 : }
208 : } finally {
209 5 : _collectHistoryUpdates = false;
210 5 : isRequestingHistory = false;
211 6 : onUpdate?.call();
212 : }
213 : }
214 :
215 : /// Request more previous events from the server. [historyCount] defines how much events should
216 : /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
217 : /// the historical events will be published in the onEvent stream. [filter] allows you to specify a
218 : /// [StateFilter] object to filter the events, which can include various criteria such as
219 : /// event types (e.g., [EventTypes.Message]) and other state-related filters.
220 : /// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden.
221 : /// Returns the actual count of received timeline events.
222 1 : Future<int> getRoomEvents({
223 : int historyCount = Room.defaultHistoryCount,
224 : direction = Direction.b,
225 : StateFilter? filter,
226 : }) async {
227 : // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
228 1 : filter ??= StateFilter(lazyLoadMembers: true);
229 1 : filter.lazyLoadMembers ??= true;
230 :
231 3 : final resp = await room.client.getRoomEvents(
232 2 : room.id,
233 : direction,
234 3 : from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
235 : limit: historyCount,
236 2 : filter: jsonEncode(filter.toJson()),
237 : );
238 :
239 1 : if (resp.end == null) {
240 2 : Logs().w('We reached the end of the timeline');
241 : }
242 :
243 2 : final newNextBatch = direction == Direction.b ? resp.start : resp.end;
244 2 : final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
245 :
246 1 : final type = direction == Direction.b
247 : ? EventUpdateType.history
248 : : EventUpdateType.timeline;
249 :
250 3 : if ((resp.state?.length ?? 0) == 0 &&
251 3 : resp.start != resp.end &&
252 : newPrevBatch != null &&
253 : newNextBatch != null) {
254 1 : if (type == EventUpdateType.history) {
255 0 : Logs().w(
256 0 : '[nav] we can still request history prevBatch: $type $newPrevBatch',
257 : );
258 : } else {
259 2 : Logs().w(
260 1 : '[nav] we can still request timeline nextBatch: $type $newNextBatch',
261 : );
262 : }
263 : }
264 :
265 : final newEvents =
266 6 : resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
267 :
268 1 : if (!allowNewEvent) {
269 3 : if (resp.start == resp.end ||
270 2 : (resp.end == null && direction == Direction.f)) {
271 1 : allowNewEvent = true;
272 : }
273 :
274 1 : if (allowNewEvent) {
275 2 : Logs().d('We now allow sync update into the timeline.');
276 1 : newEvents.addAll(
277 5 : await room.client.database.getEventList(room, onlySending: true),
278 : );
279 : }
280 : }
281 :
282 : // Try to decrypt encrypted events but don't update the database.
283 2 : if (room.encrypted && room.client.encryptionEnabled) {
284 0 : for (var i = 0; i < newEvents.length; i++) {
285 0 : if (newEvents[i].type == EventTypes.Encrypted) {
286 0 : newEvents[i] = await room.client.encryption!.decryptRoomEvent(
287 0 : newEvents[i],
288 : );
289 : }
290 : }
291 : }
292 :
293 : // update chunk anchors
294 1 : if (type == EventUpdateType.history) {
295 0 : chunk.prevBatch = newPrevBatch ?? '';
296 :
297 0 : final offset = chunk.events.length;
298 :
299 0 : chunk.events.addAll(newEvents);
300 :
301 0 : for (var i = 0; i < newEvents.length; i++) {
302 0 : onInsert?.call(i + offset);
303 : }
304 : } else {
305 2 : chunk.nextBatch = newNextBatch ?? '';
306 4 : chunk.events.insertAll(0, newEvents.reversed);
307 :
308 3 : for (var i = 0; i < newEvents.length; i++) {
309 2 : onInsert?.call(i);
310 : }
311 : }
312 :
313 1 : if (onUpdate != null) {
314 2 : onUpdate!();
315 : }
316 2 : return resp.chunk.length;
317 : }
318 :
319 13 : Timeline({
320 : required this.room,
321 : this.onUpdate,
322 : this.onChange,
323 : this.onInsert,
324 : this.onRemove,
325 : this.onNewEvent,
326 : required this.chunk,
327 : }) {
328 78 : timelineSub = room.client.onTimelineEvent.stream.listen(
329 18 : (event) => _handleEventUpdate(
330 : event,
331 : EventUpdateType.timeline,
332 : ),
333 : );
334 78 : historySub = room.client.onHistoryEvent.stream.listen(
335 8 : (event) => _handleEventUpdate(
336 : event,
337 : EventUpdateType.history,
338 : ),
339 : );
340 :
341 : // If the timeline is limited we want to clear our events cache
342 65 : roomSub = room.client.onSync.stream
343 84 : .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
344 26 : .listen(_removeEventsNotInThisSync);
345 :
346 13 : sessionIdReceivedSub =
347 65 : room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
348 13 : cancelSendEventSub =
349 78 : room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
350 :
351 : // we want to populate our aggregated events
352 25 : for (final e in events) {
353 12 : addAggregatedEvent(e);
354 : }
355 :
356 : // we are using a fragmented timeline
357 39 : if (chunk.nextBatch != '') {
358 1 : allowNewEvent = false;
359 1 : isFragmentedTimeline = true;
360 : // fragmented timelines never read from the database.
361 1 : _fetchedAllDatabaseEvents = true;
362 : }
363 : }
364 :
365 4 : void _cleanUpCancelledEvent(String eventId) {
366 4 : final i = _findEvent(event_id: eventId);
367 12 : if (i < events.length) {
368 12 : removeAggregatedEvent(events[i]);
369 8 : events.removeAt(i);
370 6 : onRemove?.call(i);
371 6 : onUpdate?.call();
372 : }
373 : }
374 :
375 : /// Removes all entries from [events] which are not in this SyncUpdate.
376 2 : void _removeEventsNotInThisSync(SyncUpdate sync) {
377 15 : final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
378 6 : final keepEventIds = newSyncEvents.map((e) => e.eventId);
379 10 : events.removeWhere((e) => !keepEventIds.contains(e.eventId));
380 : }
381 :
382 : /// Don't forget to call this before you dismiss this object!
383 0 : void cancelSubscriptions() {
384 : // ignore: discarded_futures
385 0 : timelineSub?.cancel();
386 : // ignore: discarded_futures
387 0 : historySub?.cancel();
388 : // ignore: discarded_futures
389 0 : roomSub?.cancel();
390 : // ignore: discarded_futures
391 0 : sessionIdReceivedSub?.cancel();
392 : // ignore: discarded_futures
393 0 : cancelSendEventSub?.cancel();
394 : }
395 :
396 2 : void _sessionKeyReceived(String sessionId) async {
397 : var decryptAtLeastOneEvent = false;
398 2 : Future<void> decryptFn() async {
399 6 : final encryption = room.client.encryption;
400 6 : if (!room.client.encryptionEnabled || encryption == null) {
401 : return;
402 : }
403 7 : for (var i = 0; i < events.length; i++) {
404 4 : if (events[i].type == EventTypes.Encrypted &&
405 4 : events[i].messageType == MessageTypes.BadEncrypted &&
406 0 : events[i].content['session_id'] == sessionId) {
407 0 : events[i] = await encryption.decryptRoomEvent(
408 0 : events[i],
409 : store: true,
410 : updateType: EventUpdateType.history,
411 : );
412 0 : addAggregatedEvent(events[i]);
413 0 : onChange?.call(i);
414 0 : if (events[i].type != EventTypes.Encrypted) {
415 : decryptAtLeastOneEvent = true;
416 : }
417 : }
418 : }
419 : }
420 :
421 8 : await room.client.database.transaction(decryptFn);
422 0 : if (decryptAtLeastOneEvent) onUpdate?.call();
423 : }
424 :
425 : /// Request the keys for undecryptable events of this timeline
426 0 : void requestKeys({
427 : bool tryOnlineBackup = true,
428 : bool onlineKeyBackupOnly = true,
429 : }) {
430 0 : for (final event in events) {
431 0 : if (event.type == EventTypes.Encrypted &&
432 0 : event.messageType == MessageTypes.BadEncrypted &&
433 0 : event.content['can_request_session'] == true) {
434 0 : final sessionId = event.content.tryGet<String>('session_id');
435 0 : final senderKey = event.content.tryGet<String>('sender_key');
436 : if (sessionId != null && senderKey != null) {
437 0 : room.client.encryption?.keyManager.maybeAutoRequest(
438 0 : room.id,
439 : sessionId,
440 : senderKey,
441 : tryOnlineBackup: tryOnlineBackup,
442 : onlineKeyBackupOnly: onlineKeyBackupOnly,
443 : );
444 : }
445 : }
446 : }
447 : }
448 :
449 : /// Set the read marker to the last synced event in this timeline.
450 2 : Future<void> setReadMarker({String? eventId, bool? public}) async {
451 : eventId ??=
452 12 : events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
453 : if (eventId == null) return;
454 4 : return room.setReadMarker(eventId, mRead: eventId, public: public);
455 : }
456 :
457 9 : int _findEvent({String? event_id, String? unsigned_txid}) {
458 : // we want to find any existing event where either the passed event_id or the passed unsigned_txid
459 : // matches either the event_id or transaction_id of the existing event.
460 : // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
461 : // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
462 : // thus meaning we found our element.
463 : final searchNeedle = <String>{};
464 : if (event_id != null) {
465 9 : searchNeedle.add(event_id);
466 : }
467 : if (unsigned_txid != null) {
468 6 : searchNeedle.add(unsigned_txid);
469 : }
470 : int i;
471 34 : for (i = 0; i < events.length; i++) {
472 27 : final searchHaystack = <String>{events[i].eventId};
473 :
474 27 : final txnid = events[i].transactionId;
475 : if (txnid != null) {
476 6 : searchHaystack.add(txnid);
477 : }
478 18 : if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
479 : break;
480 : }
481 : }
482 : return i;
483 : }
484 :
485 5 : void _removeEventFromSet(Set<Event> eventSet, Event event) {
486 5 : eventSet.removeWhere(
487 4 : (e) =>
488 8 : e.matchesEventOrTransactionId(event.eventId) ||
489 4 : event.unsigned != null &&
490 8 : e.matchesEventOrTransactionId(event.transactionId),
491 : );
492 : }
493 :
494 13 : void addAggregatedEvent(Event event) {
495 : // we want to add an event to the aggregation tree
496 13 : final relationshipType = event.relationshipType;
497 13 : final relationshipEventId = event.relationshipEventId;
498 : if (relationshipType == null || relationshipEventId == null) {
499 : return; // nothing to do
500 : }
501 10 : final e = (aggregatedEvents[relationshipEventId] ??=
502 10 : <String, Set<Event>>{})[relationshipType] ??= <Event>{};
503 : // remove a potential old event
504 5 : _removeEventFromSet(e, event);
505 : // add the new one
506 5 : e.add(event);
507 5 : if (onChange != null) {
508 1 : final index = _findEvent(event_id: relationshipEventId);
509 2 : onChange?.call(index);
510 : }
511 : }
512 :
513 6 : void removeAggregatedEvent(Event event) {
514 18 : aggregatedEvents.remove(event.eventId);
515 6 : if (event.transactionId != null) {
516 6 : aggregatedEvents.remove(event.transactionId);
517 : }
518 16 : for (final types in aggregatedEvents.values) {
519 8 : for (final e in types.values) {
520 4 : _removeEventFromSet(e, event);
521 : }
522 : }
523 : }
524 :
525 9 : void _handleEventUpdate(
526 : Event event,
527 : EventUpdateType type, {
528 : bool update = true,
529 : }) {
530 : try {
531 36 : if (event.roomId != room.id) return;
532 :
533 13 : if (type != EventUpdateType.timeline && type != EventUpdateType.history) {
534 : return;
535 : }
536 :
537 9 : if (type == EventUpdateType.timeline) {
538 9 : onNewEvent?.call();
539 : }
540 :
541 9 : if (!allowNewEvent) return;
542 :
543 9 : final status = event.status;
544 :
545 9 : final i = _findEvent(
546 9 : event_id: event.eventId,
547 9 : unsigned_txid: event.transactionId,
548 : );
549 :
550 27 : if (i < events.length) {
551 : // if the old status is larger than the new one, we also want to preserve the old status
552 27 : final oldStatus = events[i].status;
553 18 : events[i] = event;
554 : // do we preserve the status? we should allow 0 -> -1 updates and status increases
555 18 : if ((latestEventStatus(status, oldStatus) == oldStatus) &&
556 11 : !(status.isError && oldStatus.isSending)) {
557 21 : events[i].status = oldStatus;
558 : }
559 27 : addAggregatedEvent(events[i]);
560 11 : onChange?.call(i);
561 : } else {
562 7 : if (type == EventUpdateType.history &&
563 8 : events.indexWhere(
564 16 : (e) => e.eventId == event.eventId,
565 4 : ) !=
566 4 : -1) {
567 : return;
568 : }
569 14 : var index = events.length;
570 7 : if (type == EventUpdateType.history) {
571 8 : events.add(event);
572 : } else {
573 10 : index = events.firstIndexWhereNotError;
574 10 : events.insert(index, event);
575 : }
576 11 : onInsert?.call(index);
577 :
578 7 : addAggregatedEvent(event);
579 : }
580 :
581 : // Handle redaction events
582 18 : if (event.type == EventTypes.Redaction) {
583 6 : final index = _findEvent(event_id: event.redacts);
584 9 : if (index < events.length) {
585 9 : removeAggregatedEvent(events[index]);
586 :
587 : // Is the redacted event a reaction? Then update the event this
588 : // belongs to:
589 3 : if (onChange != null) {
590 3 : final relationshipEventId = events[index].relationshipEventId;
591 : if (relationshipEventId != null) {
592 0 : onChange?.call(_findEvent(event_id: relationshipEventId));
593 : return;
594 : }
595 : }
596 :
597 9 : events[index].setRedactionEvent(event);
598 4 : onChange?.call(index);
599 : }
600 : }
601 :
602 9 : if (update && !_collectHistoryUpdates) {
603 11 : onUpdate?.call();
604 : }
605 : } catch (e, s) {
606 0 : Logs().w('Handle event update failed', e, s);
607 : }
608 : }
609 :
610 0 : @Deprecated('Use [startSearch] instead.')
611 : Stream<List<Event>> searchEvent({
612 : String? searchTerm,
613 : int requestHistoryCount = 100,
614 : int maxHistoryRequests = 10,
615 : String? sinceEventId,
616 : int? limit,
617 : bool Function(Event)? searchFunc,
618 : }) =>
619 0 : startSearch(
620 : searchTerm: searchTerm,
621 : requestHistoryCount: requestHistoryCount,
622 : maxHistoryRequests: maxHistoryRequests,
623 : // ignore: deprecated_member_use_from_same_package
624 : sinceEventId: sinceEventId,
625 : limit: limit,
626 : searchFunc: searchFunc,
627 0 : ).map((result) => result.$1);
628 :
629 : /// Searches [searchTerm] in this timeline. It first searches in the
630 : /// cache, then in the database and then on the server. The search can
631 : /// take a while, which is why this returns a stream so the already found
632 : /// events can already be displayed.
633 : /// Override the [searchFunc] if you need another search. This will then
634 : /// ignore [searchTerm].
635 : /// Returns the List of Events and the next prevBatch at the end of the
636 : /// search.
637 0 : Stream<(List<Event>, String?)> startSearch({
638 : String? searchTerm,
639 : int requestHistoryCount = 100,
640 : int maxHistoryRequests = 10,
641 : String? prevBatch,
642 : @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
643 : int? limit,
644 : bool Function(Event)? searchFunc,
645 : }) async* {
646 0 : assert(searchTerm != null || searchFunc != null);
647 0 : searchFunc ??= (event) =>
648 0 : event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
649 0 : final found = <Event>[];
650 :
651 : if (sinceEventId == null) {
652 : // Search locally
653 0 : for (final event in events) {
654 0 : if (searchFunc(event)) {
655 0 : yield (found..add(event), null);
656 : }
657 : }
658 :
659 : // Search in database
660 0 : var start = events.length;
661 : while (true) {
662 0 : final eventsFromStore = await room.client.database.getEventList(
663 0 : room,
664 : start: start,
665 : limit: requestHistoryCount,
666 : );
667 0 : if (eventsFromStore.isEmpty) break;
668 0 : start += eventsFromStore.length;
669 0 : for (final event in eventsFromStore) {
670 0 : if (searchFunc(event)) {
671 0 : yield (found..add(event), null);
672 : }
673 : }
674 : }
675 : }
676 :
677 : // Search on the server
678 0 : prevBatch ??= room.prev_batch;
679 : if (sinceEventId != null) {
680 : prevBatch =
681 0 : (await room.client.getEventContext(room.id, sinceEventId)).end;
682 : }
683 0 : final encryption = room.client.encryption;
684 0 : for (var i = 0; i < maxHistoryRequests; i++) {
685 : if (prevBatch == null) break;
686 0 : if (limit != null && found.length >= limit) break;
687 : try {
688 0 : final resp = await room.client.getRoomEvents(
689 0 : room.id,
690 : Direction.b,
691 : from: prevBatch,
692 : limit: requestHistoryCount,
693 0 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
694 : );
695 0 : for (final matrixEvent in resp.chunk) {
696 0 : var event = Event.fromMatrixEvent(matrixEvent, room);
697 0 : if (event.type == EventTypes.Encrypted && encryption != null) {
698 0 : event = await encryption.decryptRoomEvent(event);
699 0 : if (event.type == EventTypes.Encrypted &&
700 0 : event.messageType == MessageTypes.BadEncrypted &&
701 0 : event.content['can_request_session'] == true) {
702 : // Await requestKey() here to ensure decrypted message bodies
703 0 : await event.requestKey();
704 : }
705 : }
706 0 : if (searchFunc(event)) {
707 0 : yield (found..add(event), resp.end);
708 0 : if (limit != null && found.length >= limit) break;
709 : }
710 : }
711 0 : prevBatch = resp.end;
712 : // We are at the beginning of the room
713 0 : if (resp.chunk.length < requestHistoryCount) break;
714 0 : } on MatrixException catch (e) {
715 : // We have no permission anymore to request the history
716 0 : if (e.error == MatrixError.M_FORBIDDEN) {
717 : break;
718 : }
719 : rethrow;
720 : }
721 : }
722 : return;
723 : }
724 : }
725 :
726 : extension on List<Event> {
727 5 : int get firstIndexWhereNotError {
728 5 : if (isEmpty) return 0;
729 20 : final index = indexWhere((event) => !event.status.isError);
730 11 : if (index == -1) return length;
731 : return index;
732 : }
733 : }
|