send SMP command and received stream of messages

This commit is contained in:
Evgeny Poberezkin 2021-10-28 19:55:40 +01:00
parent 41681aaa6b
commit 1b6f4683d9
5 changed files with 210 additions and 26 deletions

View File

@ -6,9 +6,12 @@ import 'package:pointycastle/asymmetric/oaep.dart';
import 'package:pointycastle/asymmetric/rsa.dart';
import 'package:pointycastle/block/aes_fast.dart';
import 'package:pointycastle/block/modes/gcm.dart';
import 'package:pointycastle/digests/sha256.dart';
import 'package:pointycastle/key_generators/api.dart';
import 'package:pointycastle/key_generators/rsa_key_generator.dart';
import 'package:pointycastle/random/fortuna_random.dart';
import 'package:pointycastle/signers/rsa_signer.dart';
import 'buffer.dart' show empty;
class AESKey {
final Uint8List _key;
@ -44,7 +47,6 @@ Uint8List _randomBytes(int len, Random seedSource) {
return bytes;
}
final empty = Uint8List(0);
final paddingByte = '#'.codeUnitAt(0);
Uint8List encryptAES(AESKey key, Uint8List iv, int padTo, Uint8List data) {
@ -90,3 +92,19 @@ Uint8List decryptOAEP(RSAPrivateKey key, Uint8List data) {
..init(false, PrivateKeyParameter<RSAPrivateKey>(key));
return oaep.process(data);
}
Uint8List signPSS(RSAPrivateKey privateKey, Uint8List data) {
final signer = RSASigner(SHA256Digest(), '0609608648016503040201')
..init(true, PrivateKeyParameter<RSAPrivateKey>(privateKey));
return signer.generateSignature(data).bytes;
}
bool verifyPSS(RSAPublicKey publicKey, Uint8List data, Uint8List sig) {
final verifier = RSASigner(SHA256Digest(), '0609608648016503040201')
..init(false, PublicKeyParameter<RSAPublicKey>(publicKey));
try {
return verifier.verifySignature(data, RSASignature(sig));
} on ArgumentError {
return false;
}
}

View File

@ -17,6 +17,7 @@ final charDot = cc('.');
class Parser {
final Uint8List _s;
final List<int> _positions = [];
int _pos = 0;
bool _fail = false;
Parser(this._s);
@ -35,6 +36,20 @@ class Parser {
return res;
}
T? tryParse<T>(T? Function(Parser p) parse) {
if (_fail || _pos >= _s.length) {
_fail = true;
return null;
}
_positions.add(_pos);
final res = parse(this);
final prevPos = _positions.removeLast();
if (res == null) {
_pos = prevPos;
_fail = false;
}
}
// takes a required number of bytes
Uint8List? take(int len) => _run(() {
final end = _pos + len;

View File

@ -174,7 +174,7 @@ class ERR extends BrokerCommand {
: cmdErr = err == ErrorType.CMD
? throw ArgumentError('CMD error should be created with ERR.CMD')
: null;
ERR.cmd(this.cmdErr) : err = ErrorType.CMD;
ERR.cmd(CmdErrorType this.cmdErr) : err = ErrorType.CMD;
@override
Uint8List serialize() {
final _err = errorTags[err]!;

View File

@ -5,6 +5,7 @@ import 'package:pointycastle/digests/sha256.dart';
import 'buffer.dart';
import 'crypto.dart';
import 'parser.dart';
import 'protocol.dart';
import 'rsa_keys.dart';
abstract class Transport {
@ -13,16 +14,6 @@ abstract class Transport {
Future<void> close();
}
Stream<Uint8List> blockStream(Transport t, int blockSize) async* {
try {
while (true) {
yield await t.read(blockSize);
}
} catch (e) {
return;
}
}
class SMPServer {
final String host;
final int? port;
@ -59,11 +50,63 @@ const int _binaryRsaTransport = 0;
const int _transportBlockSize = 4096;
const int _maxTransportBlockSize = 65536;
class _Request {
final Uint8List queueId;
final Completer<BrokerResponse> completer = Completer();
_Request(this.queueId);
}
class Either<L, R> {
final L? left;
final R? right;
Either.left(L this.left) : right = null;
Either.right(R this.right) : left = null;
}
enum ClientErrorType { SMPServerError, SMPResponseError, SMPUnexpectedResponse }
class BrokerResponse {
final BrokerCommand? command;
final ClientErrorType? errorType;
final ERR? error;
BrokerResponse(BrokerCommand this.command)
: errorType = null,
error = null;
BrokerResponse.error(ClientErrorType this.errorType, this.error)
: command = null;
}
class ClientTransmission {
final String corrId;
final Uint8List queueId;
final ClientCommand command;
ClientTransmission(this.corrId, this.queueId, this.command);
Uint8List serialize() =>
unwordsN([encodeAscii(corrId), encode64(queueId), command.serialize()]);
}
class BrokerTransmission {
final String corrId;
final Uint8List queueId;
final BrokerCommand? command;
final ERR? error;
BrokerTransmission(this.corrId, this.queueId, BrokerCommand this.command)
: error = null;
BrokerTransmission.error(this.corrId, this.queueId, ERR this.error)
: command = null;
}
final badBlock = BrokerTransmission.error('', empty, ERR(ErrorType.BLOCK));
class SMPTransportClient {
final Transport _conn;
final _sndKey = SessionKey.create();
final _rcvKey = SessionKey.create();
final int blockSize;
int _corrId = 0;
bool _messageStreamCreated = false;
final Map<String, _Request> _sentCommands = {};
SMPTransportClient._(this._conn, this.blockSize);
static Future<SMPTransportClient> connect(Transport conn,
@ -75,6 +118,92 @@ class SMPTransportClient {
return _conn.close();
}
Future<BrokerResponse> sendSMPCommand(
RSAPrivateKey? key, Uint8List queueId, ClientCommand cmd) async {
final corrId = (++_corrId).toString();
final t = ClientTransmission(corrId, queueId, cmd).serialize();
final sig = key == null ? empty : encode64(signPSS(key, t));
final data = unwordsN([sig, t, empty]);
final r = _sentCommands[corrId] = _Request(queueId);
await _writeEncrypted(data);
print('block sent');
return r.completer.future;
}
Stream<BrokerTransmission> messageStream() {
if (_messageStreamCreated) {
throw Exception('message stream already created');
}
_messageStreamCreated = true;
return _messageStream();
}
Stream<BrokerTransmission> _messageStream() async* {
try {
while (true) {
final block = await _readEncrypted();
final t = _parseBrokerTransmission(block);
print('block received');
print(t);
if (t.corrId == '') {
yield t;
} else {
final r = _sentCommands.remove(t.corrId);
if (r == null) {
yield t;
} else {
final cmd = t.command;
r.completer.complete(r.queueId.equal(t.queueId)
? cmd == null
? BrokerResponse.error(
ClientErrorType.SMPResponseError, t.error)
: cmd is ERR
? BrokerResponse.error(
ClientErrorType.SMPServerError, cmd)
: BrokerResponse(cmd)
: BrokerResponse.error(
ClientErrorType.SMPUnexpectedResponse, null));
}
}
}
} catch (e) {
return;
}
}
static BrokerTransmission _parseBrokerTransmission(Uint8List s) {
final p = Parser(s);
p.space();
final cId = p.word();
p.space();
final queueId = p.tryParse((p) => p.base64()) ?? empty;
p.space();
if (p.fail || cId == null) return badBlock;
final corrId = decodeAscii(cId);
final command = smpCommandP(p);
if (command == null) {
return BrokerTransmission.error(
corrId, queueId, ERR.cmd(CmdErrorType.SYNTAX));
}
if (command is! BrokerCommand) {
return BrokerTransmission.error(
corrId, queueId, ERR.cmd(CmdErrorType.PROHIBITED));
}
final qErr = _tQueueError(queueId, command);
if (qErr != null) {
return BrokerTransmission.error(corrId, queueId, ERR.cmd(qErr));
}
return BrokerTransmission(corrId, queueId, command);
}
static CmdErrorType? _tQueueError(Uint8List queueId, BrokerCommand cmd) {
if (cmd is IDS || cmd is PONG) {
if (queueId.isNotEmpty) return CmdErrorType.HAS_AUTH;
} else if (cmd is! ERR && queueId.isEmpty) {
return CmdErrorType.NO_QUEUE;
}
}
static Future<SMPTransportClient> _clientHandshake(
Transport conn, Uint8List? keyHash, int? blkSize) async {
final srv = await _getHeaderAndPublicKey_1_2(conn, keyHash);
@ -166,6 +295,12 @@ class SMPTransportClient {
return decryptAES(_rcvKey.aesKey, iv, block);
}
Future<void> _writeEncrypted(Uint8List data) {
final iv = _nextIV(_sndKey);
final block = encryptAES(_sndKey.aesKey, iv, blockSize, data);
return _conn.write(block);
}
static Uint8List _nextIV(SessionKey sk) {
final c = encodeInt32(sk._counter++);
final start = sk.baseIV.sublist(0, 4);

View File

@ -1,21 +1,37 @@
// import 'dart:io';
import 'package:simplexmq/simplexmq.dart';
import 'package:simplexmq/src/buffer.dart';
import 'package:simplexmq/src/crypto.dart';
import 'package:simplexmq/src/rsa_keys.dart';
import 'package:simplexmq_io/simplexmq_io.dart';
import 'package:test/test.dart';
final keyHash =
decode64(encodeAscii('pH7bg7B6vB3uJ1poKmClTAqr7yYWnAtapnIDN7ypKxU='));
void main() {
group('SMP transport', () {
test(
'establish connection (expects SMP server on localhost:5223)',
() async {
final conn = await SocketTransport.connect('localhost', 5223);
final smp = await SMPTransportClient.connect(conn,
keyHash: decode64(
encodeAscii('pH7bg7B6vB3uJ1poKmClTAqr7yYWnAtapnIDN7ypKxU=')));
expect(smp is SMPTransportClient, true);
},
skip: 'requires SMP server on port 5223',
);
});
group('SMP transport (expects SMP server on localhost:5223)', () {
test('establish connection', () async {
final conn = await SocketTransport.connect('localhost', 5223);
final smp = await SMPTransportClient.connect(conn, keyHash: keyHash);
expect(smp is SMPTransportClient, true);
});
test('should create SMP queue and send message', () async {
final conn1 = await SocketTransport.connect('localhost', 5223);
final alice = await SMPTransportClient.connect(conn1, keyHash: keyHash);
final aliceKeys = generateRSAkeyPair();
final rcvKeyStr = encode64(encodeRsaPubKey(aliceKeys.publicKey));
// final conn2 = await SocketTransport.connect('localhost', 5223);
// final bob = await SMPTransportClient.connect(conn2, keyHash: keyHash);
// final bobKeys = generateRSAkeyPair();
// final sndKeyStr = encode64(encodeRsaPubKey(bobKeys.publicKey));
// print('we are here');
final resp = await alice.sendSMPCommand(
aliceKeys.privateKey, empty, NEW(rcvKeyStr));
print(resp);
});
// });
}, skip: 'requires SMP server on port 5223');
}