Merge pull request #1939 from nosqlbench/neo4j_synchronous

Add synchronous analogues of autocommit, write_transaction, read_transaction ops to Neo4J driver adapter
This commit is contained in:
Jonathan Shook
2024-05-07 12:07:34 -07:00
committed by GitHub
17 changed files with 335 additions and 51 deletions

View File

@@ -43,13 +43,22 @@ public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp> {
LongFunction<String> spaceNameFunc = op.getAsFunctionOr("space", "default");
LongFunction<Neo4JSpace> spaceFunc = l -> cache.get(spaceNameFunc.apply(l));
return switch (typeAndTarget.enumId) {
case autocommit -> new Neo4JAutoCommitOpDispenser(
case sync_autocommit -> new Neo4JSyncAutoCommitOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case read_transaction -> new Neo4JReadTxnOpDispenser(
case async_autocommit -> new Neo4JAsyncAutoCommitOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case write_transaction -> new Neo4JWriteTxnOpDispenser(
case sync_read_transaction -> new Neo4JSyncReadTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case async_read_transaction -> new Neo4JAsyncReadTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case sync_write_transaction -> new Neo4JSyncWriteTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case async_write_transaction -> new Neo4JAsyncWriteTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
};

View File

@@ -17,24 +17,24 @@
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JWriteTxnOp;
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
public class Neo4JAsyncAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
public Neo4JAsyncAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JWriteTxnOp> createOpFunc() {
return l -> new Neo4JWriteTxnOp(
public LongFunction<Neo4JAsyncAutoCommitOp> createOpFunc() {
return l -> new Neo4JAsyncAutoCommitOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);

View File

@@ -17,8 +17,8 @@
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncReadTxnOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
@@ -26,15 +26,14 @@ import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
public class Neo4JAsyncReadTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JAsyncReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JAutoCommitOp> createOpFunc() {
return l -> new Neo4JAutoCommitOp(
public LongFunction<Neo4JAsyncReadTxnOp> createOpFunc() {
return l -> new Neo4JAsyncReadTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);

View File

@@ -18,21 +18,23 @@ package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JReadTxnOp;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAsyncWriteTxnOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JReadTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
public class Neo4JAsyncWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JAsyncWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JReadTxnOp> createOpFunc() {
return l -> new Neo4JReadTxnOp(
public LongFunction<Neo4JAsyncWriteTxnOp> createOpFunc() {
return l -> new Neo4JAsyncWriteTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.Session;
import java.util.function.LongFunction;
public class Neo4JSyncAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JSyncAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JSyncAutoCommitOp> createOpFunc() {
return l -> new Neo4JSyncAutoCommitOp(
spaceFunc.apply(l).getDriver().session(Session.class),
queryFunc.apply(l)
);
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncReadTxnOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.Session;
import java.util.function.LongFunction;
public class Neo4JSyncReadTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JSyncReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JSyncReadTxnOp> createOpFunc() {
return l -> new Neo4JSyncReadTxnOp(
spaceFunc.apply(l).getDriver().session(Session.class),
queryFunc.apply(l)
);
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JSyncWriteTxnOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.Session;
import java.util.function.LongFunction;
public class Neo4JSyncWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JSyncWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JSyncWriteTxnOp> createOpFunc() {
return l -> new Neo4JSyncWriteTxnOp(
spaceFunc.apply(l).getDriver().session(Session.class),
queryFunc.apply(l)
);
}
}

View File

@@ -26,10 +26,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JAutoCommitOp extends Neo4JBaseOp {
public class Neo4JAsyncAutoCommitOp extends Neo4JBaseOp {
private final AsyncSession session;
public Neo4JAutoCommitOp(AsyncSession session, Query query) {
super(session, query);
public Neo4JAsyncAutoCommitOp(AsyncSession session, Query query) {
super(query);
this.session = session;
}
/**

View File

@@ -26,10 +26,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JReadTxnOp extends Neo4JBaseOp{
public class Neo4JAsyncReadTxnOp extends Neo4JBaseOp{
private final AsyncSession session;
public Neo4JReadTxnOp(AsyncSession session, Query query) {
super(session, query);
public Neo4JAsyncReadTxnOp(AsyncSession session, Query query) {
super(query);
this.session = session;
}
/**

View File

@@ -26,10 +26,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JWriteTxnOp extends Neo4JBaseOp{
public class Neo4JAsyncWriteTxnOp extends Neo4JBaseOp{
private final AsyncSession session;
public Neo4JWriteTxnOp(AsyncSession session, Query query) {
super(session, query);
public Neo4JAsyncWriteTxnOp(AsyncSession session, Query query) {
super(query);
this.session = session;
}
/**

View File

@@ -24,20 +24,17 @@ import org.neo4j.driver.async.AsyncSession;
public abstract class Neo4JBaseOp implements CycleOp<Record[]> {
protected final AsyncSession session;
protected final Query query;
public Neo4JBaseOp(AsyncSession session, Query query) {
this.session = session;
public Neo4JBaseOp(Query query) {
this.query = query;
}
/**
* In the child classes, this method will be responsible for:
* - using the Neo4J AsyncSession object to run the Neo4J Query
* - using the Neo4J Session/AsyncSession object to run the Neo4J Query
* - process the Result to get an array of Records
* - close the AsyncSession
* - close the Session/AsyncSession
* - Return the array of Records
*
* Session creation and closing is considered light-weight. Reference:

View File

@@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import java.util.List;
public class Neo4JSyncAutoCommitOp extends Neo4JBaseOp {
private final Session session;
public Neo4JSyncAutoCommitOp(Session session, Query query) {
super(query);
this.session = session;
}
@Override
public final Record[] apply(long value) {
List<Record> recordList = session.run(query).list();
if (session.isOpen()) {
session.close();
}
return recordList.toArray(new Record[recordList.size()]);
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import java.util.List;
public class Neo4JSyncReadTxnOp extends Neo4JBaseOp{
private final Session session;
public Neo4JSyncReadTxnOp(Session session, Query query) {
super(query);
this.session = session;
}
@Override
public final Record[] apply(long value) {
List<Record> recordList = session.executeRead(
txn -> {
var result = txn.run(query);
return result.list();
}
);
if (session.isOpen()) {
session.close();
}
return recordList.toArray(new Record[recordList.size()]);
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Session;
import java.util.List;
public class Neo4JSyncWriteTxnOp extends Neo4JBaseOp{
private final Session session;
public Neo4JSyncWriteTxnOp(Session session, Query query) {
super(query);
this.session = session;
}
@Override
public final Record[] apply(long value) {
List<Record> recordList = session.executeWrite(
txn -> {
var result = txn.run(query);
return result.list();
}
);
if (session.isOpen()) {
session.close();
}
return recordList.toArray(new Record[recordList.size()]);
}
}

View File

@@ -18,11 +18,17 @@ package io.nosqlbench.adapter.neo4j.types;
public enum Neo4JOpType {
autocommit("autocommit"),
sync_autocommit("sync_autocommit"),
read_transaction("read_transaction"),
async_autocommit("async_autocommit"),
write_transaction("write_transaction");
sync_read_transaction("sync_read_transaction"),
async_read_transaction("async_read_transaction"),
sync_write_transaction("sync_write_transaction"),
async_write_transaction("async_write_transaction");
private final String value;

View File

@@ -24,7 +24,7 @@ blocks:
ops:
# Reference: https://support.neo4j.com/s/article/360059882854-Deleting-large-numbers-of-nodes#h_01H95CXNJ8TN4126T3Y01BRWKS
delete_nodes:
autocommit: |
sync_autocommit: |
MATCH (n)
CALL { WITH n
DETACH DELETE n
@@ -32,14 +32,14 @@ blocks:
query_params:
delete_batch_size: TEMPLATE(delete_batch_size,5000)
drop_index:
autocommit: DROP INDEX $index_name IF EXISTS
sync_autocommit: DROP INDEX $index_name IF EXISTS
query_params:
index_name: vector_index
schema:
ops:
create_vector_index:
autocommit: |
sync_autocommit: |
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
ON (n.embedding) OPTIONS
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
@@ -51,7 +51,7 @@ blocks:
rampup:
ops:
insert_node:
write_transaction: |
async_write_transaction: |
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
query_params:
id: '{id}'
@@ -61,7 +61,7 @@ blocks:
ops:
# Reference: https://community.neo4j.com/t/unwind-multiple-arrays-to-set-property/59908/5
insert_nodes:
write_transaction: |
async_write_transaction: |
WITH $id_list as ids, $vector_list as vectors
UNWIND RANGE(0, size(ids) - 1) as idx
CREATE (v:TEMPLATE(node_label,Node) {id: ids[idx], embedding: vectors[idx]})
@@ -72,7 +72,7 @@ blocks:
search:
ops:
search:
read_transaction: |
async_read_transaction: |
WITH $query_vector AS queryVector
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
YIELD node

View File

@@ -13,10 +13,13 @@ instance of the Neo4J/Aura database:
## Op Templates
The Neo4J adapter supports three different op types:
- autocommit
- read_transaction
- write_transaction
The Neo4J adapter supports six different op types:
- sync_autocommit
- async_autocommit
- sync_read_transaction
- async_read_transaction
- sync_write_transaction
- async_write_transaction
A good reference for when to use each is located at https://neo4j.com/docs/driver-manual/1.7/sessions-transactions/
@@ -32,7 +35,7 @@ vector search functionality has been properly worked through, currently.
```yaml
ops:
example_create_vector_index:
autocommit: |
sync_autocommit: |
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
ON (n.embedding) OPTIONS
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
@@ -42,14 +45,14 @@ ops:
similarity_function: TEMPLATE(similarity_function,cosine)
example_insert_node:
write_transaction: |
async_write_transaction: |
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
query_params:
id: '{id}'
vector: '{train_vector}'
example_search:
read_transaction: |
async_read_transaction: |
WITH $query_vector AS queryVector
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
YIELD node