From 19ef1f65dbe43b98be255a181ad46571b4bc93c1 Mon Sep 17 00:00:00 2001 From: Evgeny Poberezkin <2769109+epoberezkin@users.noreply.github.com> Date: Sat, 9 Oct 2021 12:42:41 +0100 Subject: [PATCH] abstract Transport class, SocketTransport class (#115) * abstract Transport class, SocketTransport class * build: import simplexmq * feat: simple io transport unit test for read and write * more efficient buffer extension Co-authored-by: alex --- .gitignore | 5 +- packages/simplex_app/.gitignore | 2 + packages/simplexmq/.gitignore | 3 - packages/simplexmq/lib/simplexmq.dart | 3 +- packages/simplexmq/lib/src/transport.dart | 17 ++++ packages/simplexmq/pubspec.yaml | 2 +- packages/simplexmq_io/CHANGELOG.md | 3 + packages/simplexmq_io/README.md | 39 +++++++++ .../example/simplexmq_io_example.dart | 6 ++ packages/simplexmq_io/lib/simplexmq_io.dart | 3 + packages/simplexmq_io/lib/src/socket.dart | 87 +++++++++++++++++++ packages/simplexmq_io/pubspec.yaml | 22 +++++ .../simplexmq_io/test/simplexmq_io_test.dart | 48 ++++++++++ 13 files changed, 233 insertions(+), 7 deletions(-) create mode 100644 packages/simplex_app/.gitignore delete mode 100644 packages/simplexmq/.gitignore create mode 100644 packages/simplexmq/lib/src/transport.dart create mode 100644 packages/simplexmq_io/CHANGELOG.md create mode 100644 packages/simplexmq_io/README.md create mode 100644 packages/simplexmq_io/example/simplexmq_io_example.dart create mode 100644 packages/simplexmq_io/lib/simplexmq_io.dart create mode 100644 packages/simplexmq_io/lib/src/socket.dart create mode 100644 packages/simplexmq_io/pubspec.yaml create mode 100644 packages/simplexmq_io/test/simplexmq_io_test.dart diff --git a/.gitignore b/.gitignore index c75fab8da..13645d2ca 100644 --- a/.gitignore +++ b/.gitignore @@ -76,7 +76,9 @@ stack.yaml.lock .packages .pub-cache/ .pub/ -/build/ +build/ +# Default behavior, only keep it for apps +pubspec.lock # Web lib/generated_plugin_registrant.dart @@ -91,3 +93,4 @@ app.*.map.json /android/app/debug /android/app/profile /android/app/release + diff --git a/packages/simplex_app/.gitignore b/packages/simplex_app/.gitignore new file mode 100644 index 000000000..8d69d6287 --- /dev/null +++ b/packages/simplex_app/.gitignore @@ -0,0 +1,2 @@ +# Keep for apps +!pubspec.lock \ No newline at end of file diff --git a/packages/simplexmq/.gitignore b/packages/simplexmq/.gitignore deleted file mode 100644 index e5208175c..000000000 --- a/packages/simplexmq/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -# Omit committing pubspec.lock for library packages; see -# https://dart.dev/guides/libraries/private-files#pubspeclock. -pubspec.lock diff --git a/packages/simplexmq/lib/simplexmq.dart b/packages/simplexmq/lib/simplexmq.dart index 5d1551352..05617bfb1 100644 --- a/packages/simplexmq/lib/simplexmq.dart +++ b/packages/simplexmq/lib/simplexmq.dart @@ -4,5 +4,4 @@ library simplexmq; export "src/protocol.dart"; - -// TODO: Export any libraries intended for clients of this package. +export "src/transport.dart" show Transport; diff --git a/packages/simplexmq/lib/src/transport.dart b/packages/simplexmq/lib/src/transport.dart new file mode 100644 index 000000000..ceabc039a --- /dev/null +++ b/packages/simplexmq/lib/src/transport.dart @@ -0,0 +1,17 @@ +import "dart:async"; +import "dart:typed_data"; + +abstract class Transport { + Future read(int n); + Future write(Uint8List data); +} + +Stream blockStream(Transport t, int blockSize) async* { + try { + while (true) { + yield await t.read(blockSize); + } + } catch (e) { + return; + } +} diff --git a/packages/simplexmq/pubspec.yaml b/packages/simplexmq/pubspec.yaml index b82fce64c..ea5669f5f 100644 --- a/packages/simplexmq/pubspec.yaml +++ b/packages/simplexmq/pubspec.yaml @@ -1,7 +1,7 @@ name: simplexmq description: A starting point for Dart libraries or applications. version: 0.0.1 -# homepage: https://www.example.com +publish_to: none environment: sdk: '>=2.14.0 <3.0.0' diff --git a/packages/simplexmq_io/CHANGELOG.md b/packages/simplexmq_io/CHANGELOG.md new file mode 100644 index 000000000..effe43c82 --- /dev/null +++ b/packages/simplexmq_io/CHANGELOG.md @@ -0,0 +1,3 @@ +## 1.0.0 + +- Initial version. diff --git a/packages/simplexmq_io/README.md b/packages/simplexmq_io/README.md new file mode 100644 index 000000000..8b55e735b --- /dev/null +++ b/packages/simplexmq_io/README.md @@ -0,0 +1,39 @@ + + +TODO: Put a short description of the package here that helps potential users +know whether this package might be useful for them. + +## Features + +TODO: List what your package can do. Maybe include images, gifs, or videos. + +## Getting started + +TODO: List prerequisites and provide or point to information on how to +start using the package. + +## Usage + +TODO: Include short and useful examples for package users. Add longer examples +to `/example` folder. + +```dart +const like = 'sample'; +``` + +## Additional information + +TODO: Tell users more about the package: where to find more information, how to +contribute to the package, how to file issues, what response they can expect +from the package authors, and more. diff --git a/packages/simplexmq_io/example/simplexmq_io_example.dart b/packages/simplexmq_io/example/simplexmq_io_example.dart new file mode 100644 index 000000000..acd05a745 --- /dev/null +++ b/packages/simplexmq_io/example/simplexmq_io_example.dart @@ -0,0 +1,6 @@ +// import 'package:simplexmq_io/simplexmq_io.dart'; + +// void main() { +// var awesome = Awesome(); +// print('awesome: ${awesome.isAwesome}'); +// } diff --git a/packages/simplexmq_io/lib/simplexmq_io.dart b/packages/simplexmq_io/lib/simplexmq_io.dart new file mode 100644 index 000000000..f7327cbd8 --- /dev/null +++ b/packages/simplexmq_io/lib/simplexmq_io.dart @@ -0,0 +1,3 @@ +library simplexmq_io; + +export "src/socket.dart"; diff --git a/packages/simplexmq_io/lib/src/socket.dart b/packages/simplexmq_io/lib/src/socket.dart new file mode 100644 index 000000000..c9df7a259 --- /dev/null +++ b/packages/simplexmq_io/lib/src/socket.dart @@ -0,0 +1,87 @@ +import "dart:async"; +import "dart:collection"; +import "dart:io"; +import "dart:typed_data"; + +import "package:simplexmq/simplexmq.dart"; + +class _STReaders { + final Completer completer = Completer(); + final int size; + _STReaders(this.size); +} + +class SocketTransport implements Transport { + final Socket _socket; + late final StreamSubscription _subscription; + // ignore: unused_field + final Duration _timeout; + + final int _bufferSize; + Uint8List _buffer = Uint8List(0); + final ListQueue<_STReaders> _readers = ListQueue(16); + SocketTransport._new(this._socket, this._timeout, this._bufferSize); + + static Future connect(String host, int port, + {Duration timeout = const Duration(seconds: 1), + int bufferSize = 16384}) async { + final socket = await Socket.connect(host, port, timeout: timeout); + final t = SocketTransport._new(socket, timeout, bufferSize); + // ignore: cancel_subscriptions + final subscription = socket.listen(t._onData, + onError: (Object e) => t._finalize, onDone: t._finalize); + t._subscription = subscription; + return t; + } + + @override + Future read(int n) async { + if (_readers.isEmpty && _buffer.length >= n) { + final data = _buffer.sublist(0, n); + _buffer = _buffer.sublist(n); + return Future.value(data); + } + final r = _STReaders(n); + _readers.add(r); + return r.completer.future; + } + + @override + Future write(Uint8List data) { + _socket.add(data); + return _socket.flush(); + } + + void _onData(Uint8List data) { + final b = Uint8List(_buffer.length + data.length); + b.setAll(0, _buffer); + b.setAll(_buffer.length, data); + _buffer = b; + while (_readers.isNotEmpty && _readers.first.size <= _buffer.length) { + final r = _readers.removeFirst(); + final d = _buffer.sublist(0, r.size); + r.completer.complete(d); + _buffer = _buffer.sublist(r.size); + } + final overflow = _buffer.length >= _bufferSize; + if (_subscription.isPaused && !overflow) { + _subscription.resume(); + } else if (!_subscription.isPaused && overflow) { + _subscription.pause(); + } + } + + void _finalize() { + _subscription.cancel(); + _socket.destroy(); + while (_readers.isNotEmpty) { + final r = _readers.removeFirst(); + r.completer.completeError(Exception("socket closed")); + } + } + + /// Allow closing the client transport. + void close() { + _finalize(); + } +} diff --git a/packages/simplexmq_io/pubspec.yaml b/packages/simplexmq_io/pubspec.yaml new file mode 100644 index 000000000..e47cffcb9 --- /dev/null +++ b/packages/simplexmq_io/pubspec.yaml @@ -0,0 +1,22 @@ +name: simplexmq_io +description: A starting point for Dart libraries or applications. +version: 1.0.0 +publish_to: none + +environment: + sdk: '>=2.14.0 <3.0.0' + +dependencies: + simplexmq: + git: + url: git://github.com/simplex-chat/simplex-chat + path: packages/simplexmq + ref: ep/socket-transport + +dependency_overrides: + simplexmq: + path: ../simplexmq + +dev_dependencies: + lints: ^1.0.0 + test: ^1.16.0 diff --git a/packages/simplexmq_io/test/simplexmq_io_test.dart b/packages/simplexmq_io/test/simplexmq_io_test.dart new file mode 100644 index 000000000..d7cc9c239 --- /dev/null +++ b/packages/simplexmq_io/test/simplexmq_io_test.dart @@ -0,0 +1,48 @@ +// ignore_for_file: prefer_double_quotes + +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:simplexmq_io/simplexmq_io.dart'; +import 'package:test/test.dart'; + +const localhost = 'localhost'; +void main() { + group('transport', () { + Future startServer( + void Function(Socket client) handleConnection) async { + var server = await ServerSocket.bind(InternetAddress.anyIPv4, 0); + server.listen(handleConnection); + return server; + } + + test('simple write', () async { + var completer = Completer(); + + var server = await startServer((Socket client) { + client.listen( + (Uint8List data) async { + completer.complete(data); + }, + ); + }); + var transport = await SocketTransport.connect(localhost, server.port); + await transport.write(Uint8List.fromList([1, 2, 3])); + + expect(await completer.future, [1, 2, 3]); + transport.close(); + await server.close(); + }); + + test('simple read', () async { + var server = await startServer((Socket client) { + client.add(Uint8List.fromList([1, 2, 3])); + }); + var transport = await SocketTransport.connect(localhost, server.port); + expect(await transport.read(3), [1, 2, 3]); + transport.close(); + await server.close(); + }); + }); +}