mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge branch 'main' into invalid-driver
This commit is contained in:
commit
a29248c96d
@ -93,7 +93,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.bouncycastle</groupId>
|
<groupId>org.bouncycastle</groupId>
|
||||||
<artifactId>bcprov-jdk18on</artifactId>
|
<artifactId>bcprov-jdk18on</artifactId>
|
||||||
<version>1.77</version>
|
<version>1.78</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.fasterxml.jackson.core</groupId>
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
@ -6,26 +6,26 @@ options {
|
|||||||
|
|
||||||
// Operators and Punctuators
|
// Operators and Punctuators
|
||||||
|
|
||||||
LR_BRACKET: '(';
|
LR_BRACKET: '(' -> pushMode(IDENTIFIER_MODE);
|
||||||
RR_BRACKET: ')';
|
RR_BRACKET: ')';
|
||||||
LC_BRACKET: '{';
|
LC_BRACKET: '{' -> pushMode(IDENTIFIER_MODE);
|
||||||
RC_BRACKET: '}';
|
RC_BRACKET: '}';
|
||||||
LS_BRACKET: '[';
|
LS_BRACKET: '[' -> pushMode(IDENTIFIER_MODE);
|
||||||
RS_BRACKET: ']';
|
RS_BRACKET: ']';
|
||||||
COMMA: ',';
|
COMMA: ',' -> pushMode(IDENTIFIER_MODE);
|
||||||
SEMI: ';';
|
SEMI: ';';
|
||||||
COLON: ':';
|
COLON: ':';
|
||||||
DOT: '.';
|
DOT: '.' -> pushMode(IDENTIFIER_MODE);
|
||||||
STAR: '*';
|
STAR: '*';
|
||||||
DIVIDE: '/';
|
DIVIDE: '/';
|
||||||
MODULE: '%';
|
MODULE: '%';
|
||||||
PLUS: '+';
|
PLUS: '+' -> pushMode(IDENTIFIER_MODE);
|
||||||
MINUSMINUS: '--';
|
MINUSMINUS: '--';
|
||||||
MINUS: '-';
|
MINUS: '-';
|
||||||
DQUOTE: '"';
|
DQUOTE: '"';
|
||||||
SQUOTE: '\'';
|
SQUOTE: '\'';
|
||||||
OPERATOR_EQ: '=';
|
OPERATOR_EQ: '=' -> pushMode(IDENTIFIER_MODE);
|
||||||
OPERATOR_LT: '<';
|
OPERATOR_LT: '<' -> pushMode(IDENTIFIER_MODE);
|
||||||
OPERATOR_GT: '>';
|
OPERATOR_GT: '>';
|
||||||
OPERATOR_LTE: '<=';
|
OPERATOR_LTE: '<=';
|
||||||
OPERATOR_GTE: '>=';
|
OPERATOR_GTE: '>=';
|
||||||
@ -37,7 +37,7 @@ K_AGGREGATE: 'AGGREGATE';
|
|||||||
K_ALL: 'ALL';
|
K_ALL: 'ALL';
|
||||||
K_ALLOW: 'ALLOW';
|
K_ALLOW: 'ALLOW';
|
||||||
K_ALTER: 'ALTER';
|
K_ALTER: 'ALTER';
|
||||||
K_AND: 'AND';
|
K_AND: 'AND' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_ANY: 'ANY';
|
K_ANY: 'ANY';
|
||||||
K_APPLY: 'APPLY';
|
K_APPLY: 'APPLY';
|
||||||
K_AS: 'AS';
|
K_AS: 'AS';
|
||||||
@ -63,7 +63,7 @@ K_DURABLE_WRITES: 'DURABLE_WRITES';
|
|||||||
K_EACH_QUORUM: 'EACH_QUORUM';
|
K_EACH_QUORUM: 'EACH_QUORUM';
|
||||||
K_ENTRIES: 'ENTRIES';
|
K_ENTRIES: 'ENTRIES';
|
||||||
K_EXECUTE: 'EXECUTE';
|
K_EXECUTE: 'EXECUTE';
|
||||||
K_EXISTS: 'EXISTS';
|
K_EXISTS: 'EXISTS' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_FALSE: 'FALSE';
|
K_FALSE: 'FALSE';
|
||||||
K_FILTERING: 'FILTERING';
|
K_FILTERING: 'FILTERING';
|
||||||
K_FINALFUNC: 'FINALFUNC';
|
K_FINALFUNC: 'FINALFUNC';
|
||||||
@ -84,7 +84,7 @@ K_IS: 'IS';
|
|||||||
K_JSON: 'JSON';
|
K_JSON: 'JSON';
|
||||||
K_KEY: 'KEY';
|
K_KEY: 'KEY';
|
||||||
K_KEYS: 'KEYS';
|
K_KEYS: 'KEYS';
|
||||||
K_KEYSPACE: 'KEYSPACE';
|
K_KEYSPACE: 'KEYSPACE' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_KEYSPACES: 'KEYSPACES';
|
K_KEYSPACES: 'KEYSPACES';
|
||||||
K_LANGUAGE: 'LANGUAGE';
|
K_LANGUAGE: 'LANGUAGE';
|
||||||
// Disabled because there was no definitive reference to this as a bare keyword in the specs
|
// Disabled because there was no definitive reference to this as a bare keyword in the specs
|
||||||
@ -101,8 +101,8 @@ K_NORECURSIVE: 'NORECURSIVE';
|
|||||||
K_NOSUPERUSER: 'NOSUPERUSER';
|
K_NOSUPERUSER: 'NOSUPERUSER';
|
||||||
K_NOT: 'NOT';
|
K_NOT: 'NOT';
|
||||||
K_NULL: 'NULL';
|
K_NULL: 'NULL';
|
||||||
K_OF: 'OF';
|
K_OF: 'OF' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_ON: 'ON';
|
K_ON: 'ON' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_ONE: 'ONE';
|
K_ONE: 'ONE';
|
||||||
K_OPTIONS: 'OPTIONS';
|
K_OPTIONS: 'OPTIONS';
|
||||||
K_OR: 'OR';
|
K_OR: 'OR';
|
||||||
@ -114,7 +114,7 @@ K_PERMISSION: 'PERMISSION';
|
|||||||
K_PERMISSIONS: 'PERMISSIONS';
|
K_PERMISSIONS: 'PERMISSIONS';
|
||||||
K_PRIMARY: 'PRIMARY';
|
K_PRIMARY: 'PRIMARY';
|
||||||
K_QUORUM: 'QUORUM';
|
K_QUORUM: 'QUORUM';
|
||||||
K_RENAME: 'RENAME';
|
K_RENAME: 'RENAME' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_REPLACE: 'REPLACE';
|
K_REPLACE: 'REPLACE';
|
||||||
K_REPLICATION: 'REPLICATION';
|
K_REPLICATION: 'REPLICATION';
|
||||||
K_RETURNS: 'RETURNS';
|
K_RETURNS: 'RETURNS';
|
||||||
@ -123,32 +123,34 @@ K_ROLE: 'ROLE';
|
|||||||
K_ROLES: 'ROLES';
|
K_ROLES: 'ROLES';
|
||||||
K_SCHEMA: 'SCHEMA';
|
K_SCHEMA: 'SCHEMA';
|
||||||
K_SELECT: 'SELECT';
|
K_SELECT: 'SELECT';
|
||||||
K_SET: 'SET';
|
K_SET_WITH_OPERATOR_LT: K_SET SPACE? OPERATOR_LT -> pushMode(IDENTIFIER_MODE);
|
||||||
|
K_SET: 'SET' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_SFUNC: 'SFUNC';
|
K_SFUNC: 'SFUNC';
|
||||||
K_STATIC: 'STATIC';
|
K_STATIC: 'STATIC';
|
||||||
K_STORAGE: 'STORAGE';
|
K_STORAGE: 'STORAGE';
|
||||||
K_STYPE: 'STYPE';
|
K_STYPE: 'STYPE';
|
||||||
K_SUPERUSER: 'SUPERUSER';
|
K_SUPERUSER: 'SUPERUSER';
|
||||||
K_TABLE: 'TABLE';
|
K_TABLE: 'TABLE' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_THREE: 'THREE';
|
K_THREE: 'THREE';
|
||||||
|
K_TIMESTAMP_WITH_DECIMAL_LITERAL: K_TIMESTAMP SPACE DECIMAL_LITERAL;
|
||||||
K_TIMESTAMP: 'TIMESTAMP';
|
K_TIMESTAMP: 'TIMESTAMP';
|
||||||
K_TO: 'TO';
|
K_TO: 'TO' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_TOKEN: 'TOKEN';
|
K_TOKEN: 'TOKEN';
|
||||||
K_TRIGGER: 'TRIGGER';
|
K_TRIGGER: 'TRIGGER';
|
||||||
K_TRUE: 'TRUE';
|
K_TRUE: 'TRUE';
|
||||||
K_TRUNCATE: 'TRUNCATE';
|
K_TRUNCATE: 'TRUNCATE' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_TTL: 'TTL';
|
K_TTL: 'TTL';
|
||||||
K_TWO: 'TWO';
|
K_TWO: 'TWO';
|
||||||
K_TYPE: 'TYPE';
|
K_TYPE: 'TYPE' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_UNLOGGED: 'UNLOGGED';
|
K_UNLOGGED: 'UNLOGGED';
|
||||||
K_UPDATE: 'UPDATE';
|
K_UPDATE: 'UPDATE' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_USE: 'USE';
|
K_USE: 'USE' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_USER: 'USER';
|
K_USER: 'USER';
|
||||||
K_USING: 'USING';
|
K_USING: 'USING';
|
||||||
K_UUID: 'UUID';
|
K_UUID: 'UUID';
|
||||||
K_VALUES: 'VALUES';
|
K_VALUES: 'VALUES';
|
||||||
K_VIEW: 'VIEW';
|
K_VIEW: 'VIEW';
|
||||||
K_WHERE: 'WHERE';
|
K_WHERE: 'WHERE' -> pushMode(IDENTIFIER_MODE);
|
||||||
K_WITH: 'WITH';
|
K_WITH: 'WITH';
|
||||||
K_WRITETIME: 'WRITETIME';
|
K_WRITETIME: 'WRITETIME';
|
||||||
K_ASCII: 'ASCII';
|
K_ASCII: 'ASCII';
|
||||||
@ -160,10 +162,13 @@ K_DATE: 'DATE';
|
|||||||
K_DECIMAL: 'DECIMAL';
|
K_DECIMAL: 'DECIMAL';
|
||||||
K_DOUBLE: 'DOUBLE';
|
K_DOUBLE: 'DOUBLE';
|
||||||
K_FLOAT: 'FLOAT';
|
K_FLOAT: 'FLOAT';
|
||||||
|
K_FROZEN_WITH_OPERATOR_LT: K_FROZEN SPACE? OPERATOR_LT -> pushMode(IDENTIFIER_MODE);
|
||||||
K_FROZEN: 'FROZEN';
|
K_FROZEN: 'FROZEN';
|
||||||
K_INET: 'INET';
|
K_INET: 'INET';
|
||||||
K_INT: 'INT';
|
K_INT: 'INT';
|
||||||
|
K_LIST_WITH_OPERATOR_LT: K_LIST SPACE? OPERATOR_LT -> pushMode(IDENTIFIER_MODE);
|
||||||
K_LIST: 'LIST';
|
K_LIST: 'LIST';
|
||||||
|
K_MAP_WITH_OPERATOR_LT: K_MAP SPACE? OPERATOR_LT -> pushMode(IDENTIFIER_MODE);
|
||||||
K_MAP: 'MAP';
|
K_MAP: 'MAP';
|
||||||
K_SMALLINT: 'SMALLINT';
|
K_SMALLINT: 'SMALLINT';
|
||||||
K_TEXT: 'TEXT';
|
K_TEXT: 'TEXT';
|
||||||
@ -221,5 +226,99 @@ fragment DEC_DIGIT: [0-9];
|
|||||||
|
|
||||||
fragment EXPONENT_NUM_PART: 'E' ('-'|'+') ? DEC_DIGIT+;
|
fragment EXPONENT_NUM_PART: 'E' ('-'|'+') ? DEC_DIGIT+;
|
||||||
|
|
||||||
|
mode IDENTIFIER_MODE;
|
||||||
|
|
||||||
|
SEMI_: SEMI -> type(SEMI), popMode;
|
||||||
|
LC_BRACKET_: LC_BRACKET -> type(LC_BRACKET);
|
||||||
|
LS_BRACKET_: LS_BRACKET -> type(LS_BRACKET);
|
||||||
|
LR_BRACKET_: LR_BRACKET -> type(LR_BRACKET);
|
||||||
|
OPERATOR_LT_: OPERATOR_LT -> type(OPERATOR_LT);
|
||||||
|
|
||||||
|
RR_BRACKET_: RR_BRACKET -> type(RR_BRACKET), popMode;
|
||||||
|
RC_BRACKET_: RC_BRACKET -> type(RC_BRACKET), popMode;
|
||||||
|
RS_BRACKET_: RS_BRACKET -> type(RS_BRACKET), popMode;
|
||||||
|
|
||||||
|
OPERATOR_GT_: OPERATOR_GT -> type(OPERATOR_GT), popMode;
|
||||||
|
|
||||||
|
// Reserved Keywords which cannot be used as identifiers
|
||||||
|
// https://cassandra.apache.org/doc/4.1/cassandra/cql/appendices.html#appendix-A
|
||||||
|
K_ADD_: K_ADD -> type(K_ADD), popMode;
|
||||||
|
K_AGGREGATE_: K_AGGREGATE -> type(K_AGGREGATE), popMode;
|
||||||
|
K_ALLOW_: K_ALLOW -> type(K_ALLOW), popMode;
|
||||||
|
K_ALTER_: K_ALTER -> type(K_ALTER), popMode;
|
||||||
|
K_AND_: K_AND -> type(K_AND), popMode;
|
||||||
|
K_ANY_: K_ANY -> type(K_ANY), popMode;
|
||||||
|
K_APPLY_: K_APPLY -> type(K_APPLY), popMode;
|
||||||
|
K_ASC_: K_ASC -> type(K_ASC), popMode;
|
||||||
|
K_AUTHORIZE_: K_AUTHORIZE -> type(K_AUTHORIZE), popMode;
|
||||||
|
K_BATCH_: K_BATCH -> type(K_BATCH), popMode;
|
||||||
|
K_BEGIN_: K_BEGIN -> type(K_BEGIN), popMode;
|
||||||
|
K_BY_: K_BY -> type(K_BY), popMode;
|
||||||
|
K_COLUMNFAMILY_: K_COLUMNFAMILY -> type(K_COLUMNFAMILY), popMode;
|
||||||
|
K_CREATE_: K_CREATE -> type(K_CREATE), popMode;
|
||||||
|
K_DELETE_: K_DELETE -> type(K_DELETE), popMode;
|
||||||
|
K_DESC_: K_DESC -> type(K_DESC), popMode;
|
||||||
|
K_DROP_: K_DROP -> type(K_DROP), popMode;
|
||||||
|
K_DURABLE_WRITES_: K_DURABLE_WRITES -> type(K_DURABLE_WRITES), popMode;
|
||||||
|
K_ENTRIES_: K_ENTRIES -> type(K_ENTRIES), popMode;
|
||||||
|
K_FALSE_: K_FALSE -> type(K_FALSE), popMode;
|
||||||
|
K_FROM_: K_FROM -> type(K_FROM), popMode;
|
||||||
|
K_FULL_: K_FULL -> type(K_FULL), popMode;
|
||||||
|
K_GRANT_: K_GRANT -> type(K_GRANT), popMode;
|
||||||
|
K_IF_: K_IF -> type(K_IF), popMode;
|
||||||
|
K_IN_: K_IN -> type(K_IN), popMode;
|
||||||
|
K_INDEX_: K_INDEX -> type(K_INDEX), popMode;
|
||||||
|
K_INFINITY_: K_INFINITY -> type(K_INFINITY), popMode;
|
||||||
|
K_INSERT_: K_INSERT -> type(K_INSERT), popMode;
|
||||||
|
K_INTO_: K_INTO -> type(K_INTO), popMode;
|
||||||
|
//K_IS: 'IS';
|
||||||
|
K_KEYSPACE_: K_KEYSPACE -> type(K_KEYSPACE), popMode;
|
||||||
|
K_LIMIT_: K_LIMIT -> type(K_LIMIT), popMode;
|
||||||
|
K_LOGGED_: K_LOGGED -> type(K_LOGGED), popMode;
|
||||||
|
K_MODIFY_: K_MODIFY -> type(K_MODIFY), popMode;
|
||||||
|
K_NAN_: K_NAN -> type(K_NAN), popMode;
|
||||||
|
K_NORECURSIVE_: K_NORECURSIVE -> type(K_NORECURSIVE), popMode;
|
||||||
|
K_NOT_: K_NOT -> type(K_NOT), popMode;
|
||||||
|
K_NULL_: K_NULL -> type(K_NULL), popMode;
|
||||||
|
K_OF_: K_OF -> type(K_OF), popMode;
|
||||||
|
K_ON_: K_ON -> type(K_ON), popMode;
|
||||||
|
K_OR_: K_OR -> type(K_OR), popMode;
|
||||||
|
K_ORDER_: K_ORDER -> type(K_ORDER), popMode;
|
||||||
|
K_PRIMARY_: K_PRIMARY -> type(K_PRIMARY), popMode;
|
||||||
|
K_RENAME_: K_RENAME -> type(K_RENAME), popMode;
|
||||||
|
K_REPLACE_: K_REPLACE -> type(K_REPLACE), popMode;
|
||||||
|
K_REVOKE_: K_REVOKE -> type(K_REVOKE), popMode;
|
||||||
|
K_SCHEMA_: K_SCHEMA -> type(K_SCHEMA), popMode;
|
||||||
|
K_SELECT_: K_SELECT -> type(K_SELECT), popMode;
|
||||||
|
K_SET_: K_SET -> type(K_SET), popMode;
|
||||||
|
K_TABLE_: K_TABLE -> type(K_TABLE), popMode;
|
||||||
|
K_TO_: K_TO -> type(K_TO), popMode;
|
||||||
|
K_TOKEN_: K_TOKEN -> type(K_TOKEN), popMode;
|
||||||
|
K_TRUNCATE_: K_TRUNCATE -> type(K_TRUNCATE), popMode;
|
||||||
|
K_UNLOGGED_: K_UNLOGGED -> type(K_UNLOGGED), popMode;
|
||||||
|
K_UPDATE_: K_UPDATE -> type(K_UPDATE), popMode;
|
||||||
|
K_USE_: K_USE -> type(K_USE), popMode;
|
||||||
|
K_USING_: K_USING -> type(K_USING), popMode;
|
||||||
|
K_WHERE_: K_WHERE -> type(K_WHERE), popMode;
|
||||||
|
K_WITH_: K_WITH -> type(K_WITH), popMode;
|
||||||
|
|
||||||
|
// handeling cases like 'frozen<','map<','list<','set<' and 'TIMESTAMP WITH 1234'
|
||||||
|
K_MAP_WITH_OPERATOR_LT_: K_MAP_WITH_OPERATOR_LT -> type(K_MAP_WITH_OPERATOR_LT);
|
||||||
|
K_TIMESTAMP_WITH_DECIMAL_LITERAL_: K_TIMESTAMP_WITH_DECIMAL_LITERAL -> type(K_TIMESTAMP_WITH_DECIMAL_LITERAL);
|
||||||
|
K_FROZEN_WITH_OPERATOR_LT_: K_FROZEN_WITH_OPERATOR_LT -> type(K_FROZEN_WITH_OPERATOR_LT);
|
||||||
|
K_SET_WITH_OPERATOR_LT_: K_SET_WITH_OPERATOR_LT -> type(K_SET_WITH_OPERATOR_LT);
|
||||||
|
K_LIST_WITH_OPERATOR_LT_: K_LIST_WITH_OPERATOR_LT -> type(K_LIST_WITH_OPERATOR_LT);
|
||||||
|
|
||||||
|
OBJECT_NAME_ : OBJECT_NAME -> type(OBJECT_NAME), popMode;
|
||||||
|
SPACE_: [ \t\r\n]+ -> channel (HIDDEN);
|
||||||
|
|
||||||
|
UUID_: UUID -> type(UUID), popMode;
|
||||||
|
|
||||||
|
// Literals
|
||||||
|
CODE_BLOCK_: CODE_BLOCK -> type(CODE_BLOCK), popMode;
|
||||||
|
STRING_LITERAL_: STRING_LITERAL -> type(STRING_LITERAL), popMode;
|
||||||
|
DECIMAL_LITERAL_: DECIMAL_LITERAL -> type(DECIMAL_LITERAL), popMode;
|
||||||
|
FLOAT_LITERAL_: FLOAT_LITERAL -> type(FLOAT_LITERAL), popMode;
|
||||||
|
HEXADECIMAL_LITERAL_: HEXADECIMAL_LITERAL -> type(HEXADECIMAL_LITERAL), popMode;
|
||||||
|
REAL_LITERAL_ : REAL_LITERAL -> type(REAL_LITERAL), popMode;
|
||||||
|
|
||||||
|
@ -136,7 +136,7 @@ createType
|
|||||||
;
|
;
|
||||||
|
|
||||||
typeMemberColumnList
|
typeMemberColumnList
|
||||||
: column dataType (syntaxComma column dataType)*
|
: column dataType (syntaxComma column dataType)* syntaxComma?
|
||||||
;
|
;
|
||||||
|
|
||||||
createTrigger
|
createTrigger
|
||||||
@ -244,8 +244,8 @@ alterType
|
|||||||
;
|
;
|
||||||
|
|
||||||
alterTypeOperation
|
alterTypeOperation
|
||||||
: alterTypeAlterType
|
// : alterTypeAlterType
|
||||||
| alterTypeAdd
|
: alterTypeAdd
|
||||||
| alterTypeRename
|
| alterTypeRename
|
||||||
;
|
;
|
||||||
|
|
||||||
@ -262,7 +262,7 @@ alterTypeRenameItem
|
|||||||
;
|
;
|
||||||
|
|
||||||
alterTypeAdd
|
alterTypeAdd
|
||||||
: kwAdd column dataType (syntaxComma column dataType)*
|
: kwAdd column dataType
|
||||||
;
|
;
|
||||||
|
|
||||||
alterTypeAlterType
|
alterTypeAlterType
|
||||||
@ -615,7 +615,7 @@ usingTtlTimestamp
|
|||||||
;
|
;
|
||||||
|
|
||||||
timestamp
|
timestamp
|
||||||
: kwTimestamp decimalLiteral
|
: K_TIMESTAMP_WITH_DECIMAL_LITERAL
|
||||||
;
|
;
|
||||||
|
|
||||||
ttl
|
ttl
|
||||||
@ -767,6 +767,7 @@ stringLiteral
|
|||||||
booleanLiteral
|
booleanLiteral
|
||||||
: K_TRUE
|
: K_TRUE
|
||||||
| K_FALSE
|
| K_FALSE
|
||||||
|
| OBJECT_NAME
|
||||||
;
|
;
|
||||||
|
|
||||||
hexadecimalLiteral
|
hexadecimalLiteral
|
||||||
@ -790,10 +791,10 @@ column
|
|||||||
|
|
||||||
dataType
|
dataType
|
||||||
: dataTypeName
|
: dataTypeName
|
||||||
| K_FROZEN syntaxBracketLa dataType syntaxBracketRa
|
| K_FROZEN_WITH_OPERATOR_LT dataType syntaxBracketRa
|
||||||
| K_SET syntaxBracketLa dataType syntaxBracketRa
|
| K_SET_WITH_OPERATOR_LT dataType syntaxBracketRa
|
||||||
| K_LIST syntaxBracketLa dataType syntaxBracketRa
|
| K_LIST_WITH_OPERATOR_LT dataType syntaxBracketRa
|
||||||
| K_MAP syntaxBracketLa dataType syntaxComma dataType syntaxBracketRa
|
| K_MAP_WITH_OPERATOR_LT dataType syntaxComma dataType syntaxBracketRa
|
||||||
;
|
;
|
||||||
|
|
||||||
dataTypeName
|
dataTypeName
|
||||||
@ -901,6 +902,7 @@ kwAggregate
|
|||||||
|
|
||||||
kwAll
|
kwAll
|
||||||
: K_ALL
|
: K_ALL
|
||||||
|
| OBJECT_NAME
|
||||||
;
|
;
|
||||||
|
|
||||||
kwAllPermissions
|
kwAllPermissions
|
||||||
@ -1101,6 +1103,7 @@ kwLogged
|
|||||||
|
|
||||||
kwLogin
|
kwLogin
|
||||||
: K_LOGIN
|
: K_LOGIN
|
||||||
|
| OBJECT_NAME
|
||||||
;
|
;
|
||||||
|
|
||||||
kwMaterialized
|
kwMaterialized
|
||||||
@ -1137,6 +1140,7 @@ kwOn
|
|||||||
|
|
||||||
kwOptions
|
kwOptions
|
||||||
: K_OPTIONS
|
: K_OPTIONS
|
||||||
|
| OBJECT_NAME
|
||||||
;
|
;
|
||||||
|
|
||||||
kwOr
|
kwOr
|
||||||
@ -1149,6 +1153,7 @@ kwOrder
|
|||||||
|
|
||||||
kwPassword
|
kwPassword
|
||||||
: K_PASSWORD
|
: K_PASSWORD
|
||||||
|
| OBJECT_NAME
|
||||||
;
|
;
|
||||||
|
|
||||||
kwPrimary
|
kwPrimary
|
||||||
@ -1201,6 +1206,7 @@ kwStype
|
|||||||
|
|
||||||
kwSuperuser
|
kwSuperuser
|
||||||
: K_SUPERUSER
|
: K_SUPERUSER
|
||||||
|
| OBJECT_NAME
|
||||||
;
|
;
|
||||||
|
|
||||||
kwTable
|
kwTable
|
||||||
|
@ -21,6 +21,12 @@ import io.nosqlbench.cqlgen.parser.CqlModelParser;
|
|||||||
import org.junit.jupiter.api.Disabled;
|
import org.junit.jupiter.api.Disabled;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
public class CqlParserHarnessTest {
|
public class CqlParserHarnessTest {
|
||||||
|
|
||||||
private final static String ksddl = """
|
private final static String ksddl = """
|
||||||
@ -81,5 +87,22 @@ public class CqlParserHarnessTest {
|
|||||||
""", null);
|
""", null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Disabled
|
||||||
|
public void testCqlExamples() throws IOException {
|
||||||
|
File folderPath = new File("src/test/resources/cql3_examples");
|
||||||
|
for (final File file : Objects.requireNonNull(folderPath.listFiles())) {
|
||||||
|
String query = Files.readString(Path.of(file.getPath()));
|
||||||
|
CqlModelParser.parse(query, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Disabled
|
||||||
|
@Test
|
||||||
|
public void testUdt() {
|
||||||
|
CGWorkloadExporter exporter = new CGWorkloadExporter();
|
||||||
|
exporter.applyAsInt(new String[] {"src/test/resources/testschemas/cql_udt.cql", "cql_udt.yaml"});
|
||||||
|
exporter.setNamingTemplate("[OPTYPE-][COLUMN-][TYPEDEF-][TABLE-]-[KEYSPACE]");
|
||||||
|
exporter.getWorkloadAsYaml();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,27 @@
|
|||||||
|
CREATE KEYSPACE baselines
|
||||||
|
WITH REPLICATION = {
|
||||||
|
'class' : 'SimpleStrategy',
|
||||||
|
'replication_factor' : 1
|
||||||
|
};
|
||||||
|
|
||||||
|
CREATE TYPE baselines.phone (
|
||||||
|
country_code int,
|
||||||
|
number text,
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TYPE baselines.address (
|
||||||
|
street text,
|
||||||
|
city text,
|
||||||
|
zip text,
|
||||||
|
phones map<text, phone>
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE baselines.user (
|
||||||
|
name text PRIMARY KEY,
|
||||||
|
addresses map<text, frozen<address>>
|
||||||
|
);
|
||||||
|
|
||||||
|
ALTER TYPE baselines.address RENAME zip TO zipcode AND city to city_code;
|
||||||
|
ALTER TYPE baselines.address ADD country text;
|
||||||
|
|
||||||
|
DROP TYPE IF EXISTS baselines.address;
|
@ -43,13 +43,22 @@ public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp> {
|
|||||||
LongFunction<String> spaceNameFunc = op.getAsFunctionOr("space", "default");
|
LongFunction<String> spaceNameFunc = op.getAsFunctionOr("space", "default");
|
||||||
LongFunction<Neo4JSpace> spaceFunc = l -> cache.get(spaceNameFunc.apply(l));
|
LongFunction<Neo4JSpace> spaceFunc = l -> cache.get(spaceNameFunc.apply(l));
|
||||||
return switch (typeAndTarget.enumId) {
|
return switch (typeAndTarget.enumId) {
|
||||||
case autocommit -> new Neo4JAutoCommitOpDispenser(
|
case sync_autocommit -> new Neo4JSyncAutoCommitOpDispenser(
|
||||||
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
||||||
);
|
);
|
||||||
case read_transaction -> new Neo4JReadTxnOpDispenser(
|
case async_autocommit -> new Neo4JAsyncAutoCommitOpDispenser(
|
||||||
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
||||||
);
|
);
|
||||||
case write_transaction -> new Neo4JWriteTxnOpDispenser(
|
case sync_read_transaction -> new Neo4JSyncReadTxnOpDispenser(
|
||||||
|
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
||||||
|
);
|
||||||
|
case async_read_transaction -> new Neo4JAsyncReadTxnOpDispenser(
|
||||||
|
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
||||||
|
);
|
||||||
|
case sync_write_transaction -> new Neo4JSyncWriteTxnOpDispenser(
|
||||||
|
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
||||||
|
);
|
||||||
|
case async_write_transaction -> new Neo4JAsyncWriteTxnOpDispenser(
|
||||||
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
@ -17,24 +17,24 @@
|
|||||||
package io.nosqlbench.adapter.neo4j.opdispensers;
|
package io.nosqlbench.adapter.neo4j.opdispensers;
|
||||||
|
|
||||||
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
||||||
|
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncAutoCommitOp;
|
||||||
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
||||||
import io.nosqlbench.adapter.neo4j.ops.Neo4JWriteTxnOp;
|
|
||||||
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
|
|
||||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||||
|
|
||||||
import org.neo4j.driver.async.AsyncSession;
|
import org.neo4j.driver.async.AsyncSession;
|
||||||
|
|
||||||
import java.util.function.LongFunction;
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
|
||||||
public class Neo4JWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
|
public class Neo4JAsyncAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
|
||||||
|
|
||||||
public Neo4JWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
public Neo4JAsyncAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
||||||
super(adapter, op, spaceFunc, requiredTemplateKey);
|
super(adapter, op, spaceFunc, requiredTemplateKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongFunction<Neo4JWriteTxnOp> createOpFunc() {
|
public LongFunction<Neo4JAsyncAutoCommitOp> createOpFunc() {
|
||||||
return l -> new Neo4JWriteTxnOp(
|
return l -> new Neo4JAsyncAutoCommitOp(
|
||||||
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
|
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
|
||||||
queryFunc.apply(l)
|
queryFunc.apply(l)
|
||||||
);
|
);
|
@ -17,8 +17,8 @@
|
|||||||
package io.nosqlbench.adapter.neo4j.opdispensers;
|
package io.nosqlbench.adapter.neo4j.opdispensers;
|
||||||
|
|
||||||
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
||||||
import io.nosqlbench.adapter.neo4j.ops.Neo4JAutoCommitOp;
|
|
||||||
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
||||||
|
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncReadTxnOp;
|
||||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||||
|
|
||||||
import org.neo4j.driver.async.AsyncSession;
|
import org.neo4j.driver.async.AsyncSession;
|
||||||
@ -26,15 +26,14 @@ import org.neo4j.driver.async.AsyncSession;
|
|||||||
import java.util.function.LongFunction;
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
|
||||||
public class Neo4JAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
|
public class Neo4JAsyncReadTxnOpDispenser extends Neo4JBaseOpDispenser {
|
||||||
|
public Neo4JAsyncReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
||||||
public Neo4JAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
|
||||||
super(adapter, op, spaceFunc, requiredTemplateKey);
|
super(adapter, op, spaceFunc, requiredTemplateKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongFunction<Neo4JAutoCommitOp> createOpFunc() {
|
public LongFunction<Neo4JAsyncReadTxnOp> createOpFunc() {
|
||||||
return l -> new Neo4JAutoCommitOp(
|
return l -> new Neo4JAsyncReadTxnOp(
|
||||||
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
|
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
|
||||||
queryFunc.apply(l)
|
queryFunc.apply(l)
|
||||||
);
|
);
|
@ -18,21 +18,23 @@ package io.nosqlbench.adapter.neo4j.opdispensers;
|
|||||||
|
|
||||||
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
||||||
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
||||||
import io.nosqlbench.adapter.neo4j.ops.Neo4JReadTxnOp;
|
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncWriteTxnOp;
|
||||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||||
|
|
||||||
import org.neo4j.driver.async.AsyncSession;
|
import org.neo4j.driver.async.AsyncSession;
|
||||||
|
|
||||||
import java.util.function.LongFunction;
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
|
||||||
public class Neo4JReadTxnOpDispenser extends Neo4JBaseOpDispenser {
|
public class Neo4JAsyncWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
|
||||||
public Neo4JReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
|
||||||
|
public Neo4JAsyncWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
||||||
super(adapter, op, spaceFunc, requiredTemplateKey);
|
super(adapter, op, spaceFunc, requiredTemplateKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LongFunction<Neo4JReadTxnOp> createOpFunc() {
|
public LongFunction<Neo4JAsyncWriteTxnOp> createOpFunc() {
|
||||||
return l -> new Neo4JReadTxnOp(
|
return l -> new Neo4JAsyncWriteTxnOp(
|
||||||
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
|
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
|
||||||
queryFunc.apply(l)
|
queryFunc.apply(l)
|
||||||
);
|
);
|
@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.neo4j.opdispensers;
|
||||||
|
|
||||||
|
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
||||||
|
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncAutoCommitOp;
|
||||||
|
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
||||||
|
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||||
|
|
||||||
|
import org.neo4j.driver.Session;
|
||||||
|
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
|
||||||
|
public class Neo4JSyncAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
|
||||||
|
|
||||||
|
public Neo4JSyncAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
||||||
|
super(adapter, op, spaceFunc, requiredTemplateKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongFunction<Neo4JSyncAutoCommitOp> createOpFunc() {
|
||||||
|
return l -> new Neo4JSyncAutoCommitOp(
|
||||||
|
spaceFunc.apply(l).getDriver().session(Session.class),
|
||||||
|
queryFunc.apply(l)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.neo4j.opdispensers;
|
||||||
|
|
||||||
|
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
||||||
|
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncReadTxnOp;
|
||||||
|
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
||||||
|
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||||
|
|
||||||
|
import org.neo4j.driver.Session;
|
||||||
|
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
|
||||||
|
public class Neo4JSyncReadTxnOpDispenser extends Neo4JBaseOpDispenser {
|
||||||
|
|
||||||
|
public Neo4JSyncReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
||||||
|
super(adapter, op, spaceFunc, requiredTemplateKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongFunction<Neo4JSyncReadTxnOp> createOpFunc() {
|
||||||
|
return l -> new Neo4JSyncReadTxnOp(
|
||||||
|
spaceFunc.apply(l).getDriver().session(Session.class),
|
||||||
|
queryFunc.apply(l)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.neo4j.opdispensers;
|
||||||
|
|
||||||
|
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
|
||||||
|
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncWriteTxnOp;
|
||||||
|
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
|
||||||
|
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||||
|
|
||||||
|
import org.neo4j.driver.Session;
|
||||||
|
|
||||||
|
import java.util.function.LongFunction;
|
||||||
|
|
||||||
|
|
||||||
|
public class Neo4JSyncWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
|
||||||
|
|
||||||
|
public Neo4JSyncWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
|
||||||
|
super(adapter, op, spaceFunc, requiredTemplateKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LongFunction<Neo4JSyncWriteTxnOp> createOpFunc() {
|
||||||
|
return l -> new Neo4JSyncWriteTxnOp(
|
||||||
|
spaceFunc.apply(l).getDriver().session(Session.class),
|
||||||
|
queryFunc.apply(l)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
@ -26,10 +26,12 @@ import java.util.concurrent.ExecutionException;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class Neo4JAutoCommitOp extends Neo4JBaseOp {
|
public class Neo4JAsyncAutoCommitOp extends Neo4JBaseOp {
|
||||||
|
private final AsyncSession session;
|
||||||
|
|
||||||
public Neo4JAutoCommitOp(AsyncSession session, Query query) {
|
public Neo4JAsyncAutoCommitOp(AsyncSession session, Query query) {
|
||||||
super(session, query);
|
super(query);
|
||||||
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
@ -26,10 +26,12 @@ import java.util.concurrent.ExecutionException;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class Neo4JReadTxnOp extends Neo4JBaseOp{
|
public class Neo4JAsyncReadTxnOp extends Neo4JBaseOp{
|
||||||
|
private final AsyncSession session;
|
||||||
|
|
||||||
public Neo4JReadTxnOp(AsyncSession session, Query query) {
|
public Neo4JAsyncReadTxnOp(AsyncSession session, Query query) {
|
||||||
super(session, query);
|
super(query);
|
||||||
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
@ -26,10 +26,12 @@ import java.util.concurrent.ExecutionException;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class Neo4JWriteTxnOp extends Neo4JBaseOp{
|
public class Neo4JAsyncWriteTxnOp extends Neo4JBaseOp{
|
||||||
|
private final AsyncSession session;
|
||||||
|
|
||||||
public Neo4JWriteTxnOp(AsyncSession session, Query query) {
|
public Neo4JAsyncWriteTxnOp(AsyncSession session, Query query) {
|
||||||
super(session, query);
|
super(query);
|
||||||
|
this.session = session;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
@ -24,20 +24,17 @@ import org.neo4j.driver.async.AsyncSession;
|
|||||||
|
|
||||||
|
|
||||||
public abstract class Neo4JBaseOp implements CycleOp<Record[]> {
|
public abstract class Neo4JBaseOp implements CycleOp<Record[]> {
|
||||||
|
|
||||||
protected final AsyncSession session;
|
|
||||||
protected final Query query;
|
protected final Query query;
|
||||||
|
|
||||||
public Neo4JBaseOp(AsyncSession session, Query query) {
|
public Neo4JBaseOp(Query query) {
|
||||||
this.session = session;
|
|
||||||
this.query = query;
|
this.query = query;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In the child classes, this method will be responsible for:
|
* In the child classes, this method will be responsible for:
|
||||||
* - using the Neo4J AsyncSession object to run the Neo4J Query
|
* - using the Neo4J Session/AsyncSession object to run the Neo4J Query
|
||||||
* - process the Result to get an array of Records
|
* - process the Result to get an array of Records
|
||||||
* - close the AsyncSession
|
* - close the Session/AsyncSession
|
||||||
* - Return the array of Records
|
* - Return the array of Records
|
||||||
*
|
*
|
||||||
* Session creation and closing is considered light-weight. Reference:
|
* Session creation and closing is considered light-weight. Reference:
|
||||||
|
@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.neo4j.ops;
|
||||||
|
|
||||||
|
import org.neo4j.driver.Query;
|
||||||
|
import org.neo4j.driver.Record;
|
||||||
|
import org.neo4j.driver.Session;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
public class Neo4JSyncAutoCommitOp extends Neo4JBaseOp {
|
||||||
|
private final Session session;
|
||||||
|
|
||||||
|
public Neo4JSyncAutoCommitOp(Session session, Query query) {
|
||||||
|
super(query);
|
||||||
|
this.session = session;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final Record[] apply(long value) {
|
||||||
|
List<Record> recordList = session.run(query).list();
|
||||||
|
if (session.isOpen()) {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
return recordList.toArray(new Record[recordList.size()]);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.neo4j.ops;
|
||||||
|
|
||||||
|
import org.neo4j.driver.Query;
|
||||||
|
import org.neo4j.driver.Record;
|
||||||
|
import org.neo4j.driver.Session;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
|
public class Neo4JSyncReadTxnOp extends Neo4JBaseOp{
|
||||||
|
private final Session session;
|
||||||
|
|
||||||
|
public Neo4JSyncReadTxnOp(Session session, Query query) {
|
||||||
|
super(query);
|
||||||
|
this.session = session;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final Record[] apply(long value) {
|
||||||
|
List<Record> recordList = session.executeRead(
|
||||||
|
txn -> {
|
||||||
|
var result = txn.run(query);
|
||||||
|
return result.list();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
if (session.isOpen()) {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
return recordList.toArray(new Record[recordList.size()]);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.adapter.neo4j.ops;
|
||||||
|
|
||||||
|
import org.neo4j.driver.Query;
|
||||||
|
import org.neo4j.driver.Record;
|
||||||
|
import org.neo4j.driver.Session;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class Neo4JSyncWriteTxnOp extends Neo4JBaseOp{
|
||||||
|
private final Session session;
|
||||||
|
|
||||||
|
public Neo4JSyncWriteTxnOp(Session session, Query query) {
|
||||||
|
super(query);
|
||||||
|
this.session = session;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public final Record[] apply(long value) {
|
||||||
|
List<Record> recordList = session.executeWrite(
|
||||||
|
txn -> {
|
||||||
|
var result = txn.run(query);
|
||||||
|
return result.list();
|
||||||
|
}
|
||||||
|
);
|
||||||
|
if (session.isOpen()) {
|
||||||
|
session.close();
|
||||||
|
}
|
||||||
|
return recordList.toArray(new Record[recordList.size()]);
|
||||||
|
}
|
||||||
|
}
|
@ -18,11 +18,17 @@ package io.nosqlbench.adapter.neo4j.types;
|
|||||||
|
|
||||||
public enum Neo4JOpType {
|
public enum Neo4JOpType {
|
||||||
|
|
||||||
autocommit("autocommit"),
|
sync_autocommit("sync_autocommit"),
|
||||||
|
|
||||||
read_transaction("read_transaction"),
|
async_autocommit("async_autocommit"),
|
||||||
|
|
||||||
write_transaction("write_transaction");
|
sync_read_transaction("sync_read_transaction"),
|
||||||
|
|
||||||
|
async_read_transaction("async_read_transaction"),
|
||||||
|
|
||||||
|
sync_write_transaction("sync_write_transaction"),
|
||||||
|
|
||||||
|
async_write_transaction("async_write_transaction");
|
||||||
|
|
||||||
private final String value;
|
private final String value;
|
||||||
|
|
||||||
|
@ -24,7 +24,7 @@ blocks:
|
|||||||
ops:
|
ops:
|
||||||
# Reference: https://support.neo4j.com/s/article/360059882854-Deleting-large-numbers-of-nodes#h_01H95CXNJ8TN4126T3Y01BRWKS
|
# Reference: https://support.neo4j.com/s/article/360059882854-Deleting-large-numbers-of-nodes#h_01H95CXNJ8TN4126T3Y01BRWKS
|
||||||
delete_nodes:
|
delete_nodes:
|
||||||
autocommit: |
|
sync_autocommit: |
|
||||||
MATCH (n)
|
MATCH (n)
|
||||||
CALL { WITH n
|
CALL { WITH n
|
||||||
DETACH DELETE n
|
DETACH DELETE n
|
||||||
@ -32,14 +32,14 @@ blocks:
|
|||||||
query_params:
|
query_params:
|
||||||
delete_batch_size: TEMPLATE(delete_batch_size,5000)
|
delete_batch_size: TEMPLATE(delete_batch_size,5000)
|
||||||
drop_index:
|
drop_index:
|
||||||
autocommit: DROP INDEX $index_name IF EXISTS
|
sync_autocommit: DROP INDEX $index_name IF EXISTS
|
||||||
query_params:
|
query_params:
|
||||||
index_name: vector_index
|
index_name: vector_index
|
||||||
|
|
||||||
schema:
|
schema:
|
||||||
ops:
|
ops:
|
||||||
create_vector_index:
|
create_vector_index:
|
||||||
autocommit: |
|
sync_autocommit: |
|
||||||
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
|
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
|
||||||
ON (n.embedding) OPTIONS
|
ON (n.embedding) OPTIONS
|
||||||
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
|
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
|
||||||
@ -51,7 +51,7 @@ blocks:
|
|||||||
rampup:
|
rampup:
|
||||||
ops:
|
ops:
|
||||||
insert_node:
|
insert_node:
|
||||||
write_transaction: |
|
async_write_transaction: |
|
||||||
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
|
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
|
||||||
query_params:
|
query_params:
|
||||||
id: '{id}'
|
id: '{id}'
|
||||||
@ -61,7 +61,7 @@ blocks:
|
|||||||
ops:
|
ops:
|
||||||
# Reference: https://community.neo4j.com/t/unwind-multiple-arrays-to-set-property/59908/5
|
# Reference: https://community.neo4j.com/t/unwind-multiple-arrays-to-set-property/59908/5
|
||||||
insert_nodes:
|
insert_nodes:
|
||||||
write_transaction: |
|
async_write_transaction: |
|
||||||
WITH $id_list as ids, $vector_list as vectors
|
WITH $id_list as ids, $vector_list as vectors
|
||||||
UNWIND RANGE(0, size(ids) - 1) as idx
|
UNWIND RANGE(0, size(ids) - 1) as idx
|
||||||
CREATE (v:TEMPLATE(node_label,Node) {id: ids[idx], embedding: vectors[idx]})
|
CREATE (v:TEMPLATE(node_label,Node) {id: ids[idx], embedding: vectors[idx]})
|
||||||
@ -72,7 +72,7 @@ blocks:
|
|||||||
search:
|
search:
|
||||||
ops:
|
ops:
|
||||||
search:
|
search:
|
||||||
read_transaction: |
|
async_read_transaction: |
|
||||||
WITH $query_vector AS queryVector
|
WITH $query_vector AS queryVector
|
||||||
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
|
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
|
||||||
YIELD node
|
YIELD node
|
||||||
|
@ -13,10 +13,13 @@ instance of the Neo4J/Aura database:
|
|||||||
|
|
||||||
## Op Templates
|
## Op Templates
|
||||||
|
|
||||||
The Neo4J adapter supports three different op types:
|
The Neo4J adapter supports six different op types:
|
||||||
- autocommit
|
- sync_autocommit
|
||||||
- read_transaction
|
- async_autocommit
|
||||||
- write_transaction
|
- sync_read_transaction
|
||||||
|
- async_read_transaction
|
||||||
|
- sync_write_transaction
|
||||||
|
- async_write_transaction
|
||||||
|
|
||||||
A good reference for when to use each is located at https://neo4j.com/docs/driver-manual/1.7/sessions-transactions/
|
A good reference for when to use each is located at https://neo4j.com/docs/driver-manual/1.7/sessions-transactions/
|
||||||
|
|
||||||
@ -32,7 +35,7 @@ vector search functionality has been properly worked through, currently.
|
|||||||
```yaml
|
```yaml
|
||||||
ops:
|
ops:
|
||||||
example_create_vector_index:
|
example_create_vector_index:
|
||||||
autocommit: |
|
sync_autocommit: |
|
||||||
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
|
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
|
||||||
ON (n.embedding) OPTIONS
|
ON (n.embedding) OPTIONS
|
||||||
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
|
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
|
||||||
@ -42,14 +45,14 @@ ops:
|
|||||||
similarity_function: TEMPLATE(similarity_function,cosine)
|
similarity_function: TEMPLATE(similarity_function,cosine)
|
||||||
|
|
||||||
example_insert_node:
|
example_insert_node:
|
||||||
write_transaction: |
|
async_write_transaction: |
|
||||||
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
|
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
|
||||||
query_params:
|
query_params:
|
||||||
id: '{id}'
|
id: '{id}'
|
||||||
vector: '{train_vector}'
|
vector: '{train_vector}'
|
||||||
|
|
||||||
example_search:
|
example_search:
|
||||||
read_transaction: |
|
async_read_transaction: |
|
||||||
WITH $query_vector AS queryVector
|
WITH $query_vector AS queryVector
|
||||||
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
|
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
|
||||||
YIELD node
|
YIELD node
|
||||||
|
@ -37,7 +37,7 @@
|
|||||||
</description>
|
</description>
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<s4j.version>4.0.1</s4j.version>
|
<s4j.version>4.1.2-alpha</s4j.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
@ -99,8 +99,9 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
// Keep track the transaction count per thread
|
// Keep track the transaction count per thread
|
||||||
private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
|
private final ThreadLocal<Integer> txnBatchTrackingCnt = ThreadLocal.withInitial(() -> 0);
|
||||||
|
|
||||||
// Represents the JMS connection
|
// Instead of using one "physical" connection per NB process,
|
||||||
private PulsarConnectionFactory s4jConnFactory;
|
// allows creating multiple connections to the Pulsar broker via the "num_conn" parameter
|
||||||
|
private final ConcurrentHashMap<String, PulsarConnectionFactory> connFactories = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
private long totalCycleNum;
|
private long totalCycleNum;
|
||||||
|
|
||||||
@ -125,6 +126,7 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(
|
this.sessionMode = S4JAdapterUtil.getSessionModeFromStr(
|
||||||
cfg.getOptional("session_mode").orElse(""));
|
cfg.getOptional("session_mode").orElse(""));
|
||||||
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
|
this.s4JClientConf = new S4JClientConf(webSvcUrl, pulsarSvcUrl, s4jClientConfFileName);
|
||||||
|
logger.info("{}", s4JClientConf.toString());
|
||||||
|
|
||||||
this.setS4JActivityStartTimeMills(System.currentTimeMillis());
|
this.setS4JActivityStartTimeMills(System.currentTimeMillis());
|
||||||
|
|
||||||
@ -217,22 +219,22 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
|
|
||||||
public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); }
|
public long incTotalNullMsgRecvdCnt() { return nullMsgRecvCnt.incrementAndGet(); }
|
||||||
|
|
||||||
public PulsarConnectionFactory getS4jConnFactory() { return s4jConnFactory; }
|
|
||||||
|
|
||||||
public long getTotalCycleNum() { return totalCycleNum; }
|
public long getTotalCycleNum() { return totalCycleNum; }
|
||||||
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
|
public void setTotalCycleNum(long cycleNum) { totalCycleNum = cycleNum; }
|
||||||
|
|
||||||
public void initializeSpace(S4JClientConf s4JClientConnInfo) {
|
public void initializeSpace(S4JClientConf s4JClientConnInfo) {
|
||||||
if (s4jConnFactory == null) {
|
|
||||||
Map<String, Object> cfgMap;
|
Map<String, Object> cfgMap;
|
||||||
try {
|
try {
|
||||||
cfgMap = s4JClientConnInfo.getS4jConfObjMap();
|
cfgMap = s4JClientConnInfo.getS4jConfObjMap();
|
||||||
s4jConnFactory = new PulsarConnectionFactory(cfgMap);
|
|
||||||
|
|
||||||
for (int i=0; i<getMaxNumConn(); i++) {
|
for (int i=0; i<getMaxNumConn(); i++) {
|
||||||
// Establish a JMS connection
|
|
||||||
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);
|
String connLvlJmsConnContextIdStr =getConnLvlJmsContextIdentifier(i);
|
||||||
|
|
||||||
|
// Establish a JMS connection
|
||||||
|
PulsarConnectionFactory s4jConnFactory = connFactories.computeIfAbsent(
|
||||||
|
connLvlJmsConnContextIdStr,
|
||||||
|
__ -> new PulsarConnectionFactory(cfgMap));
|
||||||
|
|
||||||
JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
|
JMSContext jmsConnContext = getOrCreateConnLvlJMSContext(s4jConnFactory, s4JClientConnInfo, sessionMode);
|
||||||
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
|
jmsConnContext.setClientID(connLvlJmsConnContextIdStr);
|
||||||
jmsConnContext.setExceptionListener(e -> {
|
jmsConnContext.setExceptionListener(e -> {
|
||||||
@ -253,10 +255,6 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
|
logger.error("Unable to initialize JMS connection factory with the following configuration parameters: {}", s4JClientConnInfo.toString());
|
||||||
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
|
throw new S4JAdapterUnexpectedException("Unable to initialize JMS connection factory with the following error message: " + e.getCause());
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdownSpace() {
|
public void shutdownSpace() {
|
||||||
@ -284,7 +282,9 @@ public class S4JSpace implements AutoCloseable {
|
|||||||
if (jmsContext != null) jmsContext.close();
|
if (jmsContext != null) jmsContext.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
s4jConnFactory.close();
|
for (PulsarConnectionFactory s4jConnFactory : connFactories.values()) {
|
||||||
|
if (s4jConnFactory != null) s4jConnFactory.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
String exp = "Unexpected error when shutting down the S4J adaptor space";
|
String exp = "Unexpected error when shutting down the S4J adaptor space";
|
||||||
|
@ -40,11 +40,13 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
|
|||||||
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
|
private final static Logger logger = LogManager.getLogger("MessageProducerOpDispenser");
|
||||||
|
|
||||||
public static final String MSG_HEADER_OP_PARAM = "msg_header";
|
public static final String MSG_HEADER_OP_PARAM = "msg_header";
|
||||||
|
public static final String MSG_PRIORITY_OP_PARAM = "msg_priority";
|
||||||
public static final String MSG_PROP_OP_PARAM = "msg_property";
|
public static final String MSG_PROP_OP_PARAM = "msg_property";
|
||||||
public static final String MSG_BODY_OP_PARAM = "msg_body";
|
public static final String MSG_BODY_OP_PARAM = "msg_body";
|
||||||
public static final String MSG_TYPE_OP_PARAM = "msg_type";
|
public static final String MSG_TYPE_OP_PARAM = "msg_type";
|
||||||
|
|
||||||
private final LongFunction<String> msgHeaderRawJsonStrFunc;
|
private final LongFunction<String> msgHeaderRawJsonStrFunc;
|
||||||
|
private final LongFunction<String> msgPriorityStrFunc;
|
||||||
private final LongFunction<String> msgPropRawJsonStrFunc;
|
private final LongFunction<String> msgPropRawJsonStrFunc;
|
||||||
private final LongFunction<String> msgBodyRawJsonStrFunc;
|
private final LongFunction<String> msgBodyRawJsonStrFunc;
|
||||||
private final LongFunction<String> msgTypeFunc;
|
private final LongFunction<String> msgTypeFunc;
|
||||||
@ -56,6 +58,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
|
|||||||
super(adapter, op, tgtNameFunc, s4jSpace);
|
super(adapter, op, tgtNameFunc, s4jSpace);
|
||||||
|
|
||||||
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
|
this.msgHeaderRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_HEADER_OP_PARAM);
|
||||||
|
this.msgPriorityStrFunc = lookupOptionalStrOpValueFunc(MSG_PRIORITY_OP_PARAM);
|
||||||
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
|
this.msgPropRawJsonStrFunc = lookupOptionalStrOpValueFunc(MSG_PROP_OP_PARAM);
|
||||||
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
|
this.msgBodyRawJsonStrFunc = lookupMandtoryStrOpValueFunc(MSG_BODY_OP_PARAM);
|
||||||
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
|
this.msgTypeFunc = lookupOptionalStrOpValueFunc(MSG_TYPE_OP_PARAM);
|
||||||
@ -272,6 +275,7 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
|
|||||||
public MessageProducerOp getOp(long cycle) {
|
public MessageProducerOp getOp(long cycle) {
|
||||||
String destName = destNameStrFunc.apply(cycle);
|
String destName = destNameStrFunc.apply(cycle);
|
||||||
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
|
String jmsMsgHeaderRawJsonStr = msgHeaderRawJsonStrFunc.apply(cycle);
|
||||||
|
String jmsMsgPriorityStr = msgPriorityStrFunc.apply(cycle);
|
||||||
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
|
String jmsMsgPropertyRawJsonStr = msgPropRawJsonStrFunc.apply(cycle);
|
||||||
String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle);
|
String jmsMsgBodyRawJsonStr = msgBodyRawJsonStrFunc.apply(cycle);
|
||||||
|
|
||||||
@ -294,6 +298,9 @@ public class MessageProducerOpDispenser extends S4JBaseOpDispenser {
|
|||||||
JMSProducer producer;
|
JMSProducer producer;
|
||||||
try {
|
try {
|
||||||
producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI);
|
producer = getJmsProducer(s4JJMSContextWrapper, asyncAPI);
|
||||||
|
int priority = NumberUtils.toInt(jmsMsgPriorityStr);
|
||||||
|
assert (priority >= 0 && priority <= 9);
|
||||||
|
producer.setPriority(priority);
|
||||||
}
|
}
|
||||||
catch (JMSException jmsException) {
|
catch (JMSException jmsException) {
|
||||||
throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!");
|
throw new S4JAdapterUnexpectedException("Unable to create the JMS producer!");
|
||||||
|
@ -225,7 +225,7 @@ public abstract class S4JBaseOpDispenser extends BaseOpDispenser<S4JOp, S4JSpac
|
|||||||
String destType,
|
String destType,
|
||||||
String destName) throws JMSRuntimeException
|
String destName) throws JMSRuntimeException
|
||||||
{
|
{
|
||||||
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifer();
|
String jmsContextIdStr = s4JJMSContextWrapper.getJmsContextIdentifier();
|
||||||
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
|
JMSContext jmsContext = s4JJMSContextWrapper.getJmsContext();
|
||||||
|
|
||||||
S4JSpace.JMSDestinationCacheKey destinationCacheKey =
|
S4JSpace.JMSDestinationCacheKey destinationCacheKey =
|
||||||
|
@ -205,6 +205,30 @@ public class S4JAdapterUtil {
|
|||||||
return StringUtils.join(JMS_MESSAGE_TYPES.LABELS, ", ");
|
return StringUtils.join(JMS_MESSAGE_TYPES.LABELS, ", ");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Message compression types
|
||||||
|
public enum MSG_COMPRESSION_TYPE_STR {
|
||||||
|
LZ4("LZ4"),
|
||||||
|
ZSTD("ZSTD"),
|
||||||
|
ZLIB("ZLIB"),
|
||||||
|
SNAPPY("SNAPPY");
|
||||||
|
public final String label;
|
||||||
|
MSG_COMPRESSION_TYPE_STR(String label) {
|
||||||
|
this.label = label;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Set<String> LABELS = Stream.of(values()).map(v -> v.label)
|
||||||
|
.collect(Collectors.toUnmodifiableSet());
|
||||||
|
private static boolean isValidLabel(String label) {
|
||||||
|
return LABELS.contains(StringUtils.upperCase(label));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
public static String getValidMsgCompressionTypeList() {
|
||||||
|
return StringUtils.join(MSG_COMPRESSION_TYPE_STR.LABELS, ", ");
|
||||||
|
}
|
||||||
|
public static boolean isValidMsgCompressionTypeStr(String type) {
|
||||||
|
return MSG_COMPRESSION_TYPE_STR.isValidLabel(type);
|
||||||
|
}
|
||||||
|
|
||||||
///////
|
///////
|
||||||
// Convert JSON string to a key/value map
|
// Convert JSON string to a key/value map
|
||||||
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
|
public static Map<String, String> convertJsonToMap(String jsonStr) throws Exception {
|
||||||
|
@ -22,6 +22,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
|||||||
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
|
import io.nosqlbench.adapter.s4j.exception.S4JAdapterInvalidParamException;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.commons.lang3.math.NumberUtils;
|
import org.apache.commons.lang3.math.NumberUtils;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -35,6 +37,8 @@ import java.util.Map;
|
|||||||
*/
|
*/
|
||||||
public class S4JClientConfConverter {
|
public class S4JClientConfConverter {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(S4JClientConfConverter.class);
|
||||||
|
|
||||||
public static Map<String, Object> convertRawClientConf(Map<String, String> pulsarClientConfMapRaw) {
|
public static Map<String, Object> convertRawClientConf(Map<String, String> pulsarClientConfMapRaw) {
|
||||||
Map<String, Object> s4jClientConfObjMap = new HashMap<>();
|
Map<String, Object> s4jClientConfObjMap = new HashMap<>();
|
||||||
s4jClientConfObjMap.putAll(pulsarClientConfMapRaw);
|
s4jClientConfObjMap.putAll(pulsarClientConfMapRaw);
|
||||||
@ -71,34 +75,31 @@ public class S4JClientConfConverter {
|
|||||||
/**
|
/**
|
||||||
* Non-primitive type processing for Pulsar producer configuration items
|
* Non-primitive type processing for Pulsar producer configuration items
|
||||||
*/
|
*/
|
||||||
// "compressionType" has value type "CompressionType"
|
|
||||||
// - expecting the following values: 'LZ4', 'ZLIB', 'ZSTD', 'SNAPPY'
|
|
||||||
String confKeyName = "compressionType";
|
String confKeyName = "compressionType";
|
||||||
String confVal = pulsarProducerConfMapRaw.get(confKeyName);
|
String confVal = pulsarProducerConfMapRaw.get(confKeyName);
|
||||||
String expectedVal = "(LZ4|ZLIB|ZSTD|SNAPPY)";
|
|
||||||
|
|
||||||
if (StringUtils.isNotBlank(confVal)) {
|
|
||||||
if (StringUtils.equalsAnyIgnoreCase(confVal, "LZ4", "ZLIB", "ZSTD", "SNAPPY")) {
|
|
||||||
CompressionType compressionType = CompressionType.NONE;
|
CompressionType compressionType = CompressionType.NONE;
|
||||||
|
if ( StringUtils.isNotBlank(confVal) ) {
|
||||||
switch (StringUtils.upperCase(confVal)) {
|
try {
|
||||||
case "LZ4":
|
S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR compressionTypeStr =
|
||||||
compressionType = CompressionType.LZ4;
|
S4JAdapterUtil.MSG_COMPRESSION_TYPE_STR.valueOf(confVal);
|
||||||
case "ZLIB":
|
compressionType = switch (compressionTypeStr) {
|
||||||
compressionType = CompressionType.ZLIB;
|
case LZ4 -> CompressionType.LZ4;
|
||||||
case "ZSTD":
|
case ZLIB -> CompressionType.ZLIB;
|
||||||
compressionType = CompressionType.ZSTD;
|
case ZSTD -> CompressionType.ZSTD;
|
||||||
case "SNAPPY":
|
case SNAPPY -> CompressionType.SNAPPY;
|
||||||
compressionType = CompressionType.SNAPPY;
|
};
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
// Any invalid value will be treated as no compression
|
||||||
|
logger.warn("Invalid producer \"compressionType\" value ({}) in the config properties file. " +
|
||||||
|
"Expecting one of the following values: {}. " +
|
||||||
|
"No message compression will be applied (for producers).",
|
||||||
|
confVal,
|
||||||
|
S4JAdapterUtil.getValidMsgCompressionTypeList());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s4jProducerConfObjMap.put(confKeyName, compressionType);
|
s4jProducerConfObjMap.put(confKeyName, compressionType);
|
||||||
} else {
|
|
||||||
throw new S4JAdapterInvalidParamException(
|
|
||||||
getInvalidConfValStr(confKeyName, confVal, "producer", expectedVal));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Skip the following Pulsar configuration items for now because they're not really
|
// TODO: Skip the following Pulsar configuration items for now because they're not really
|
||||||
// needed in the NB S4J testing at the moment. Add support for them when needed.
|
// needed in the NB S4J testing at the moment. Add support for them when needed.
|
||||||
// * messageRoutingMode
|
// * messageRoutingMode
|
||||||
@ -312,7 +313,9 @@ public class S4JClientConfConverter {
|
|||||||
Map.entry("jms.usePulsarAdmin","boolean"),
|
Map.entry("jms.usePulsarAdmin","boolean"),
|
||||||
Map.entry("jms.useServerSideFiltering","boolean"),
|
Map.entry("jms.useServerSideFiltering","boolean"),
|
||||||
Map.entry("jms.waitForServerStartupTimeout","int"),
|
Map.entry("jms.waitForServerStartupTimeout","int"),
|
||||||
Map.entry("jms.transactionsStickyPartitions", "boolean")
|
Map.entry("jms.transactionsStickyPartitions", "boolean"),
|
||||||
|
Map.entry("jms.enableJMSPriority","boolean"),
|
||||||
|
Map.entry("jms.priorityMapping","String")
|
||||||
);
|
);
|
||||||
public static Map<String, Object> convertRawJmsConf(Map<String, String> s4jJmsConfMapRaw) {
|
public static Map<String, Object> convertRawJmsConf(Map<String, String> s4jJmsConfMapRaw) {
|
||||||
Map<String, Object> s4jJmsConfObjMap = new HashMap<>();
|
Map<String, Object> s4jJmsConfObjMap = new HashMap<>();
|
||||||
|
@ -22,19 +22,19 @@ import javax.jms.Session;
|
|||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
public class S4JJMSContextWrapper {
|
public class S4JJMSContextWrapper {
|
||||||
private final String jmsContextIdentifer;
|
private final String jmsContextIdentifier;
|
||||||
private final JMSContext jmsContext;
|
private final JMSContext jmsContext;
|
||||||
private final int jmsSessionMode;
|
private final int jmsSessionMode;
|
||||||
|
|
||||||
public S4JJMSContextWrapper(String identifer, JMSContext jmsContext) {
|
public S4JJMSContextWrapper(String identifer, JMSContext jmsContext) {
|
||||||
this.jmsContextIdentifer = identifer;
|
this.jmsContextIdentifier = identifer;
|
||||||
this.jmsContext = jmsContext;
|
this.jmsContext = jmsContext;
|
||||||
this.jmsSessionMode = jmsContext.getSessionMode();
|
this.jmsSessionMode = jmsContext.getSessionMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getJmsSessionMode() { return jmsSessionMode; }
|
public int getJmsSessionMode() { return jmsSessionMode; }
|
||||||
public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); }
|
public boolean isTransactedMode() { return Session.SESSION_TRANSACTED == this.getJmsSessionMode(); }
|
||||||
public String getJmsContextIdentifer() { return jmsContextIdentifer; }
|
public String getJmsContextIdentifier() { return jmsContextIdentifier; }
|
||||||
public JMSContext getJmsContext() { return jmsContext; }
|
public JMSContext getJmsContext() { return jmsContext; }
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
@ -45,7 +45,7 @@ public class S4JJMSContextWrapper {
|
|||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return new ToStringBuilder(this).
|
return new ToStringBuilder(this).
|
||||||
append("jmsContextIdentifer", jmsContextIdentifer).
|
append("jmsContextIdentifier", jmsContextIdentifier).
|
||||||
append("jmsContext", jmsContext.toString()).
|
append("jmsContext", jmsContext.toString()).
|
||||||
toString();
|
toString();
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,7 @@ bindings:
|
|||||||
mymap_val1: AlphaNumericString(10)
|
mymap_val1: AlphaNumericString(10)
|
||||||
mymap_val2: AlphaNumericString(20)
|
mymap_val2: AlphaNumericString(20)
|
||||||
mystream_val1: AlphaNumericString(50)
|
mystream_val1: AlphaNumericString(50)
|
||||||
|
my_priority: WeightedLongs('2:20;4:70;8:10')
|
||||||
|
|
||||||
# document level parameters that apply to all Pulsar client types:
|
# document level parameters that apply to all Pulsar client types:
|
||||||
params:
|
params:
|
||||||
@ -25,6 +26,12 @@ blocks:
|
|||||||
"JMSPriority": "9"
|
"JMSPriority": "9"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
## (Optional) S4J Message priority emulation (since Pulsar doesn't have native message priority)
|
||||||
|
# - jms.enableJMSPriority must be set to true in S4J configuration;
|
||||||
|
# otherwise, the priority value will be ignored.
|
||||||
|
# - If this is set, the "JMSPriority" value in the header will be ignored.
|
||||||
|
msg_priority: "{my_priority}"
|
||||||
|
|
||||||
## (Optional) JMS properties, predefined or customized (in JSON format).
|
## (Optional) JMS properties, predefined or customized (in JSON format).
|
||||||
msg_property: |
|
msg_property: |
|
||||||
{
|
{
|
||||||
|
@ -22,7 +22,12 @@ import org.apache.logging.log4j.Logger;
|
|||||||
|
|
||||||
import java.security.InvalidParameterException;
|
import java.security.InvalidParameterException;
|
||||||
|
|
||||||
public record CyclesSpec(long first_inclusive, long last_exclusive, String firstSpec, String lastSpec) {
|
public record CyclesSpec(
|
||||||
|
long first_inclusive,
|
||||||
|
long last_exclusive,
|
||||||
|
String firstSpec,
|
||||||
|
String lastSpec
|
||||||
|
) {
|
||||||
private final static Logger logger = LogManager.getLogger(CyclesSpec.class);
|
private final static Logger logger = LogManager.getLogger(CyclesSpec.class);
|
||||||
public CyclesSpec {
|
public CyclesSpec {
|
||||||
if (first_inclusive>last_exclusive) {
|
if (first_inclusive>last_exclusive) {
|
||||||
@ -32,16 +37,24 @@ public record CyclesSpec(long first_inclusive, long last_exclusive, String first
|
|||||||
|
|
||||||
public static CyclesSpec parse(String spec) {
|
public static CyclesSpec parse(String spec) {
|
||||||
int rangeAt = spec.indexOf("..");
|
int rangeAt = spec.indexOf("..");
|
||||||
String beginningInclusive = "0";
|
String beginningSpec = "0";
|
||||||
String endingExclusive = spec;
|
String endingSpec = spec;
|
||||||
if (0 < rangeAt) {
|
if (0 < rangeAt) {
|
||||||
beginningInclusive = spec.substring(0, rangeAt);
|
beginningSpec = spec.substring(0, rangeAt);
|
||||||
endingExclusive = spec.substring(rangeAt+2);
|
endingSpec = spec.substring(rangeAt+2);
|
||||||
|
}
|
||||||
|
long first = Unit.longCountFor(beginningSpec).orElseThrow(() -> new RuntimeException("Unable to parse start cycles from " + spec));
|
||||||
|
long last=first;
|
||||||
|
if (endingSpec.startsWith("+")) {
|
||||||
|
long added=Unit.longCountFor(endingSpec.substring(1)).orElseThrow(() -> new RuntimeException(
|
||||||
|
"Unable to parse incremental cycle interval. Use one of these forms: 100 or 0..100 or 0..+100"
|
||||||
|
));
|
||||||
|
last = first+added;
|
||||||
|
} else {
|
||||||
|
last = Unit.longCountFor(endingSpec).orElseThrow(() -> new RuntimeException("Unable to parse start cycles from " + spec));
|
||||||
}
|
}
|
||||||
long first = Unit.longCountFor(beginningInclusive).orElseThrow(() -> new RuntimeException("Unable to parse start cycles from " + spec));
|
|
||||||
long last = Unit.longCountFor(endingExclusive).orElseThrow(() -> new RuntimeException("Unable to parse start cycles from " + spec));
|
|
||||||
|
|
||||||
return new CyclesSpec(first, last, beginningInclusive, endingExclusive);
|
return new CyclesSpec(first, last, beginningSpec, endingSpec);
|
||||||
}
|
}
|
||||||
|
|
||||||
public String summary() {
|
public String summary() {
|
||||||
@ -74,4 +87,30 @@ public record CyclesSpec(long first_inclusive, long last_exclusive, String first
|
|||||||
public long cycle_count() {
|
public long cycle_count() {
|
||||||
return last_exclusive -first_inclusive;
|
return last_exclusive -first_inclusive;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o) {
|
||||||
|
if (this == o)
|
||||||
|
return true;
|
||||||
|
if (!(o instanceof CyclesSpec that))
|
||||||
|
return false;
|
||||||
|
if (last_exclusive!=that.last_exclusive)
|
||||||
|
return false;
|
||||||
|
if (first_inclusive!=that.first_inclusive)
|
||||||
|
return false;
|
||||||
|
if (!firstSpec.equals(that.firstSpec))
|
||||||
|
return false;
|
||||||
|
if (!lastSpec.equals(that.lastSpec))
|
||||||
|
return false;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int result = Long.hashCode(first_inclusive);
|
||||||
|
result = 31 * result + Long.hashCode(last_exclusive);
|
||||||
|
result = 31 * result + firstSpec.hashCode();
|
||||||
|
result = 31 * result + lastSpec.hashCode();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,60 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.nb.api.engine.activityimpl;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.*;
|
||||||
|
|
||||||
|
public class CyclesSpecTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleNumberSpec() {
|
||||||
|
assertThat(CyclesSpec.parse("100")).isEqualTo(
|
||||||
|
new CyclesSpec(0,100,"0","100")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIntervalSpec() {
|
||||||
|
assertThat(CyclesSpec.parse("3..13")).isEqualTo(
|
||||||
|
new CyclesSpec(3,13,"3","13")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnitsSupport() {
|
||||||
|
assertThat(CyclesSpec.parse("5M")).isEqualTo(
|
||||||
|
new CyclesSpec(0,5_000_000L,"0","5M")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIncrementalSupport() {
|
||||||
|
assertThat(CyclesSpec.parse("5M..+100")).isEqualTo(
|
||||||
|
new CyclesSpec(5_000_000L,5_000_100L,"5M","+100")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIncrementalUnitsSupport() {
|
||||||
|
assertThat(CyclesSpec.parse("5M..+1M")).isEqualTo(
|
||||||
|
new CyclesSpec(5_000_000L,6_000_000L,"5M","+1M")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user