abstract Transport class, SocketTransport class (#115)

* abstract Transport class, SocketTransport class

* build: import simplexmq

* feat: simple io transport unit test for read and write

* more efficient buffer extension

Co-authored-by: alex <alex@tekartik.com>
This commit is contained in:
Evgeny Poberezkin
2021-10-09 12:42:41 +01:00
committed by GitHub
parent 09ace76b82
commit 19ef1f65db
13 changed files with 233 additions and 7 deletions

5
.gitignore vendored
View File

@@ -76,7 +76,9 @@ stack.yaml.lock
.packages
.pub-cache/
.pub/
/build/
build/
# Default behavior, only keep it for apps
pubspec.lock
# Web
lib/generated_plugin_registrant.dart
@@ -91,3 +93,4 @@ app.*.map.json
/android/app/debug
/android/app/profile
/android/app/release

2
packages/simplex_app/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
# Keep for apps
!pubspec.lock

View File

@@ -1,3 +0,0 @@
# Omit committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

View File

@@ -4,5 +4,4 @@
library simplexmq;
export "src/protocol.dart";
// TODO: Export any libraries intended for clients of this package.
export "src/transport.dart" show Transport;

View File

@@ -0,0 +1,17 @@
import "dart:async";
import "dart:typed_data";
abstract class Transport {
Future<Uint8List> read(int n);
Future<void> write(Uint8List data);
}
Stream<Uint8List> blockStream(Transport t, int blockSize) async* {
try {
while (true) {
yield await t.read(blockSize);
}
} catch (e) {
return;
}
}

View File

@@ -1,7 +1,7 @@
name: simplexmq
description: A starting point for Dart libraries or applications.
version: 0.0.1
# homepage: https://www.example.com
publish_to: none
environment:
sdk: '>=2.14.0 <3.0.0'

View File

@@ -0,0 +1,3 @@
## 1.0.0
- Initial version.

View File

@@ -0,0 +1,39 @@
<!--
This README describes the package. If you publish this package to pub.dev,
this README's contents appear on the landing page for your package.
For information about how to write a good package README, see the guide for
[writing package pages](https://dart.dev/guides/libraries/writing-package-pages).
For general information about developing packages, see the Dart guide for
[creating packages](https://dart.dev/guides/libraries/create-library-packages)
and the Flutter guide for
[developing packages and plugins](https://flutter.dev/developing-packages).
-->
TODO: Put a short description of the package here that helps potential users
know whether this package might be useful for them.
## Features
TODO: List what your package can do. Maybe include images, gifs, or videos.
## Getting started
TODO: List prerequisites and provide or point to information on how to
start using the package.
## Usage
TODO: Include short and useful examples for package users. Add longer examples
to `/example` folder.
```dart
const like = 'sample';
```
## Additional information
TODO: Tell users more about the package: where to find more information, how to
contribute to the package, how to file issues, what response they can expect
from the package authors, and more.

View File

@@ -0,0 +1,6 @@
// import 'package:simplexmq_io/simplexmq_io.dart';
// void main() {
// var awesome = Awesome();
// print('awesome: ${awesome.isAwesome}');
// }

View File

@@ -0,0 +1,3 @@
library simplexmq_io;
export "src/socket.dart";

View File

@@ -0,0 +1,87 @@
import "dart:async";
import "dart:collection";
import "dart:io";
import "dart:typed_data";
import "package:simplexmq/simplexmq.dart";
class _STReaders {
final Completer<Uint8List> completer = Completer();
final int size;
_STReaders(this.size);
}
class SocketTransport implements Transport {
final Socket _socket;
late final StreamSubscription _subscription;
// ignore: unused_field
final Duration _timeout;
final int _bufferSize;
Uint8List _buffer = Uint8List(0);
final ListQueue<_STReaders> _readers = ListQueue(16);
SocketTransport._new(this._socket, this._timeout, this._bufferSize);
static Future<SocketTransport> connect(String host, int port,
{Duration timeout = const Duration(seconds: 1),
int bufferSize = 16384}) async {
final socket = await Socket.connect(host, port, timeout: timeout);
final t = SocketTransport._new(socket, timeout, bufferSize);
// ignore: cancel_subscriptions
final subscription = socket.listen(t._onData,
onError: (Object e) => t._finalize, onDone: t._finalize);
t._subscription = subscription;
return t;
}
@override
Future<Uint8List> read(int n) async {
if (_readers.isEmpty && _buffer.length >= n) {
final data = _buffer.sublist(0, n);
_buffer = _buffer.sublist(n);
return Future.value(data);
}
final r = _STReaders(n);
_readers.add(r);
return r.completer.future;
}
@override
Future<void> write(Uint8List data) {
_socket.add(data);
return _socket.flush();
}
void _onData(Uint8List data) {
final b = Uint8List(_buffer.length + data.length);
b.setAll(0, _buffer);
b.setAll(_buffer.length, data);
_buffer = b;
while (_readers.isNotEmpty && _readers.first.size <= _buffer.length) {
final r = _readers.removeFirst();
final d = _buffer.sublist(0, r.size);
r.completer.complete(d);
_buffer = _buffer.sublist(r.size);
}
final overflow = _buffer.length >= _bufferSize;
if (_subscription.isPaused && !overflow) {
_subscription.resume();
} else if (!_subscription.isPaused && overflow) {
_subscription.pause();
}
}
void _finalize() {
_subscription.cancel();
_socket.destroy();
while (_readers.isNotEmpty) {
final r = _readers.removeFirst();
r.completer.completeError(Exception("socket closed"));
}
}
/// Allow closing the client transport.
void close() {
_finalize();
}
}

View File

@@ -0,0 +1,22 @@
name: simplexmq_io
description: A starting point for Dart libraries or applications.
version: 1.0.0
publish_to: none
environment:
sdk: '>=2.14.0 <3.0.0'
dependencies:
simplexmq:
git:
url: git://github.com/simplex-chat/simplex-chat
path: packages/simplexmq
ref: ep/socket-transport
dependency_overrides:
simplexmq:
path: ../simplexmq
dev_dependencies:
lints: ^1.0.0
test: ^1.16.0

View File

@@ -0,0 +1,48 @@
// ignore_for_file: prefer_double_quotes
import 'dart:async';
import 'dart:io';
import 'dart:typed_data';
import 'package:simplexmq_io/simplexmq_io.dart';
import 'package:test/test.dart';
const localhost = 'localhost';
void main() {
group('transport', () {
Future<ServerSocket> startServer(
void Function(Socket client) handleConnection) async {
var server = await ServerSocket.bind(InternetAddress.anyIPv4, 0);
server.listen(handleConnection);
return server;
}
test('simple write', () async {
var completer = Completer<Uint8List>();
var server = await startServer((Socket client) {
client.listen(
(Uint8List data) async {
completer.complete(data);
},
);
});
var transport = await SocketTransport.connect(localhost, server.port);
await transport.write(Uint8List.fromList([1, 2, 3]));
expect(await completer.future, [1, 2, 3]);
transport.close();
await server.close();
});
test('simple read', () async {
var server = await startServer((Socket client) {
client.add(Uint8List.fromList([1, 2, 3]));
});
var transport = await SocketTransport.connect(localhost, server.port);
expect(await transport.read(3), [1, 2, 3]);
transport.close();
await server.close();
});
});
}