initial version of neo4j driver adapter

This commit is contained in:
ShaunakDas88 2024-04-08 11:59:43 -07:00
parent 623087ad42
commit 02b36a282d
22 changed files with 1152 additions and 0 deletions

58
adapter-neo4j/pom.xml Normal file
View File

@ -0,0 +1,58 @@
<!--
~ Copyright (c) 2024 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>adapter-neo4j</artifactId>
<packaging>jar</packaging>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>${revision}</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<name>${project.artifactId}</name>
<description>
An nosqlbench adapter driver module for the Neo4J/Aura database.
</description>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>5.18.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ClientException;
import java.util.NoSuchElementException;
public class Neo4JAdapterUtils {
/**
* Mask the digits in the given string with '*'
*
* @param unmasked The string to mask
* @return The masked string
*/
protected static String maskDigits(String unmasked) {
assert StringUtils.isNotBlank(unmasked) && StringUtils.isNotEmpty(unmasked);
int inputLength = unmasked.length();
StringBuilder masked = new StringBuilder(inputLength);
for (char ch : unmasked.toCharArray()) {
if (Character.isDigit(ch)) {
masked.append("*");
} else {
masked.append(ch);
}
}
return masked.toString();
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Value.html#asObject()
*/
public static Object[] getFieldForAllRecords(Record[] records, String fieldName) {
int n = records.length;
Object[] values = new Object[n];
int idx;
for (int i = 0; i < n; i++) {
try {
idx = records[i].index(fieldName);
values[i] = records[i].get(idx).asObject();
}
catch (NoSuchElementException e) {
throw e;
}
catch (ClientException e) {
throw e;
}
}
return values;
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "neo4j")
public class Neo4JDriverAdapter extends BaseDriverAdapter<Neo4JBaseOp, Neo4JSpace> {
public Neo4JDriverAdapter(NBComponent parentComponent, NBLabels labels) {
super(parentComponent, labels);
}
@Override
public OpMapper getOpMapper() {
return new Neo4JOpMapper(this, getSpaceCache());
}
@Override
public Function<String, ? extends Neo4JSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new Neo4JSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(Neo4JSpace.getConfigModel());
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
@Service(value = DriverAdapterLoader.class, selector = "neo4j")
public class Neo4JDriverAdapterLoader implements DriverAdapterLoader {
@Override
public Neo4JDriverAdapter load(NBComponent parent, NBLabels childLabels) {
return new Neo4JDriverAdapter(parent, childLabels);
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.adapter.neo4j.opdispensers.*;
import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp;
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import java.util.function.LongFunction;
public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp> {
private final DriverSpaceCache<? extends Neo4JSpace> cache;
private final Neo4JDriverAdapter adapter;
public Neo4JOpMapper(Neo4JDriverAdapter adapter, DriverSpaceCache<? extends Neo4JSpace> cache) {
this.adapter = adapter;
this.cache = cache;
}
@Override
public OpDispenser<? extends Neo4JBaseOp> apply(ParsedOp op) {
TypeAndTarget<Neo4JOpType, String> typeAndTarget = op.getTypeAndTarget(Neo4JOpType.class, String.class);
LongFunction<String> spaceNameFunc = op.getAsFunctionOr("space", "default");
LongFunction<Neo4JSpace> spaceFunc = l -> cache.get(spaceNameFunc.apply(l));
return switch (typeAndTarget.enumId) {
case autocommit -> new Neo4JAutoCommitOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case read_transaction -> new Neo4JReadTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case write_transaction -> new Neo4JWriteTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
};
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.nb.api.config.standard.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import java.util.Optional;
public class Neo4JSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(Neo4JSpace.class);
private final String space;
private Driver driver;
public Neo4JSpace(String space, NBConfiguration cfg) {
this.space = space;
this.driver = initializeDriver(cfg);
driver.verifyConnectivity();
}
private Driver initializeDriver(NBConfiguration cfg) {
String dbURI = cfg.get("db_uri");
Optional<String> usernameOpt = cfg.getOptional("username");
Optional<String> passwordOpt = cfg.getOptional("password");
String username;
String password;
// user has supplied both username and password
if (usernameOpt.isPresent() && passwordOpt.isPresent()) {
username = usernameOpt.get();
password = passwordOpt.get();
logger.info(this.space + ": Creating new Neo4J driver with [" +
"dbURI = " + dbURI +
", username = " + username +
", password = " + Neo4JAdapterUtils.maskDigits(password) +
"]"
);
return GraphDatabase.driver(dbURI, AuthTokens.basic(username, password));
}
// user has only supplied username
else if (usernameOpt.isPresent()) {
String error = "username is present, but password is not defined.";
logger.error(error);
throw new RuntimeException(error);
}
// user has only supplied password
else if (passwordOpt.isPresent()) {
String error = "password is present, but username is not defined.";
logger.error(error);
throw new RuntimeException(error);
}
// user has supplied neither
else {
logger.info(this.space + ": Creating new Neo4J driver with [" +
"dbURI = " + dbURI +
"]"
);
return GraphDatabase.driver(dbURI);
}
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(Neo4JSpace.class)
.add(Param.required("db_uri", String.class))
.add(Param.optional("username", String.class))
.add(Param.optional("password", String.class))
.asReadOnly();
}
public Driver getDriver() {
return driver;
}
@Override
public void close() throws Exception {
if (driver != null){
driver.close();
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JAutoCommitOp> createOpFunc() {
return l -> new Neo4JAutoCommitOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.Query;
import java.util.Collections;
import java.util.function.LongFunction;
import java.util.Map;
public abstract class Neo4JBaseOpDispenser extends BaseOpDispenser<Neo4JBaseOp, Neo4JSpace> {
protected final LongFunction<Neo4JSpace> spaceFunc;
protected final LongFunction<String> cypherFunc;
protected final LongFunction<Query> queryFunc;
protected final LongFunction<Map> paramFunc;
protected final LongFunction<Neo4JBaseOp> opFunc;
public Neo4JBaseOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op);
this.spaceFunc = spaceFunc;
this.cypherFunc = op.getAsRequiredFunction(requiredTemplateKey);
this.paramFunc = createParamFunc(op);
this.queryFunc = createQueryFunc();
this.opFunc = (LongFunction<Neo4JBaseOp>) createOpFunc();
}
private LongFunction<Map> createParamFunc(ParsedOp op) {
return op.getAsOptionalFunction("query_params", Map.class)
.orElse(l -> Collections.emptyMap());
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Query.html#withParameters(java.util.Map)
*/
private LongFunction<Query> createQueryFunc() {
return l -> new Query(cypherFunc.apply(l)).withParameters(paramFunc.apply(l));
}
public abstract LongFunction<? extends Neo4JBaseOp> createOpFunc();
@Override
public Neo4JBaseOp apply(long cycle) {
return opFunc.apply(cycle);
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JReadTxnOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JReadTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JReadTxnOp> createOpFunc() {
return l -> new Neo4JReadTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JWriteTxnOp;
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JWriteTxnOp> createOpFunc() {
return l -> new Neo4JWriteTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import java.util.concurrent.ExecutionException;
public class NBExecutionException extends RuntimeException {
private final ExecutionException exception;
public NBExecutionException(ExecutionException e) {
this.exception = e;
}
@Override
public String getMessage() {
return "Wrapped Exception: " + exception.getMessage();
}
@Override
public StackTraceElement[] getStackTrace() {
return exception.getStackTrace();
}
@Override
public synchronized Throwable getCause() {
return exception.getCause();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
public class NBInterruptedException extends RuntimeException {
private final InterruptedException exception;
public NBInterruptedException(InterruptedException e) {
this.exception = e;
}
@Override
public String getMessage() {
return "Wrapped Exception: " + exception.getMessage();
}
@Override
public StackTraceElement[] getStackTrace() {
return exception.getStackTrace();
}
@Override
public synchronized Throwable getCause() {
return exception.getCause();
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import java.util.concurrent.TimeoutException;
public class NBTimeoutException extends RuntimeException {
private final TimeoutException exception;
public NBTimeoutException(TimeoutException e) {
this.exception = e;
}
@Override
public String getMessage() {
return "Wrapped Exception: " + exception.getMessage();
}
@Override
public StackTraceElement[] getStackTrace() {
return exception.getStackTrace();
}
@Override
public synchronized Throwable getCause() {
return exception.getCause();
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JAutoCommitOp extends Neo4JBaseOp {
public Neo4JAutoCommitOp(AsyncSession session, Query query) {
super(session, query);
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#runAsync(java.lang.String,java.util.Map,org.neo4j.driver.TransactionConfig)
*/
@Override
public final Record[] apply(long value) {
try {
CompletionStage<List<Record>> resultStage = session.runAsync(query).thenComposeAsync(
cursor -> cursor.listAsync().whenComplete(
(records, throwable) -> {
if (throwable != null) {
session.closeAsync();
}
}
)
);
List<Record> recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
return recordList.toArray(new Record[recordList.size()]);
} catch (ExecutionException exe) {
Throwable ee = exe.getCause();
if (ee instanceof RuntimeException re) {
throw re;
} else throw new NBExecutionException(exe);
} catch (InterruptedException ie) {
throw new NBInterruptedException(ie);
} catch (TimeoutException e) {
throw new NBTimeoutException(e);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
public abstract class Neo4JBaseOp implements CycleOp<Record[]> {
protected final AsyncSession session;
protected final Query query;
public Neo4JBaseOp(AsyncSession session, Query query) {
this.session = session;
this.query = query;
}
/**
* In the child classes, this method will be responsible for:
* - using the Neo4J AsyncSession object to run the Neo4J Query
* - process the Result to get an array of Records
* - close the AsyncSession
* - Return the array of Records
*
* Session creation and closing is considered light-weight. Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Session.html#close()
*/
public abstract Record[] apply(long value);
@Override
public String toString() {
return "Neo4JBaseOp(" + query.toString().getClass().getSimpleName() + ")";
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JReadTxnOp extends Neo4JBaseOp{
public Neo4JReadTxnOp(AsyncSession session, Query query) {
super(session, query);
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback)
*/
@Override
public final Record[] apply(long value) {
try {
CompletionStage<List<Record>> resultStage = session.executeReadAsync(
txn -> txn.runAsync(query).thenComposeAsync(
cursor -> cursor.listAsync().whenComplete(
(records, throwable) -> {
if (throwable != null) {
session.closeAsync();
}
}
)
)
);
List<Record> recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
return recordList.toArray(new Record[recordList.size()]);
} catch (ExecutionException exe) {
Throwable ee = exe.getCause();
if (ee instanceof RuntimeException re) {
throw re;
} else throw new NBExecutionException(exe);
} catch (InterruptedException ie) {
throw new NBInterruptedException(ie);
} catch (TimeoutException e) {
throw new NBTimeoutException(e);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JWriteTxnOp extends Neo4JBaseOp{
public Neo4JWriteTxnOp(AsyncSession session, Query query) {
super(session, query);
}
/**
* References:
* - https://neo4j.com/docs/java-manual/current/async/
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback)
*/
@Override
public final Record[] apply(long value) {
try {
CompletionStage<List<Record>> resultStage = session.executeWriteAsync(
txn -> txn.runAsync(query).thenComposeAsync(
cursor -> cursor.listAsync().whenComplete(
(records, throwable) -> {
if (throwable != null) {
session.closeAsync();
}
}
)
)
);
List<Record> recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
return recordList.toArray(new Record[recordList.size()]);
} catch (ExecutionException exe) {
Throwable ee = exe.getCause();
if (ee instanceof RuntimeException re) {
throw re;
} else throw new NBExecutionException(exe);
} catch (InterruptedException ie) {
throw new NBInterruptedException(ie);
} catch (TimeoutException e) {
throw new NBTimeoutException(e);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.types;
public enum Neo4JOpType {
autocommit("autocommit"),
read_transaction("read_transaction"),
write_transaction("write_transaction");
private final String value;
Neo4JOpType(String value) {
this.value = value;
}
public String getValue(){
return value;
}
}

View File

@ -0,0 +1,99 @@
min_version: 5.21.1
description: |
Sample Neo4J Driver workload for ANN. Responsible for resetting/creating schema, ingesting vector
data from HDF5 file format, and then performing ANN queries against the ingested data
Template Variables:
TEMPLATE(datafile)
TEMPLATE(node_label,Node)
TEMPLATE(k,100)
TEMPLATE(batch_size)
TEMPLATE(delete_batch_size,5000)
bindings:
id: ToString()
id_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),long->ToString());
train_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train");
train_vector_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train"));
test_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/test");
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datafile).hdf5", "/neighbors")
blocks:
# TODO: Node deletion times out; attempt this in future: CREATE OR REPLACE DATABASE neo4j
reset-schema:
ops:
# Reference: https://support.neo4j.com/s/article/360059882854-Deleting-large-numbers-of-nodes#h_01H95CXNJ8TN4126T3Y01BRWKS
delete_nodes:
autocommit: |
MATCH (n)
CALL { WITH n
DETACH DELETE n
} IN TRANSACTIONS OF $delete_batch_size ROWS;
query_params:
delete_batch_size: TEMPLATE(delete_batch_size,5000)
drop_index:
autocommit: DROP INDEX $index_name IF EXISTS
query_params:
index_name: vector_index
schema:
ops:
create_vector_index:
autocommit: |
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
ON (n.embedding) OPTIONS
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
query_params:
index_name: vector_index
dimension: TEMPLATE(dimension)
similarity_function: TEMPLATE(similarity_function,cosine)
rampup:
ops:
insert_node:
write_transaction: |
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
query_params:
id: '{id}'
vector: '{train_vector}'
rampup-batch:
ops:
# Reference: https://community.neo4j.com/t/unwind-multiple-arrays-to-set-property/59908/5
insert_nodes:
write_transaction: |
WITH $id_list as ids, $vector_list as vectors
UNWIND RANGE(0, size(ids) - 1) as idx
CREATE (v:TEMPLATE(node_label,Node) {id: ids[idx], embedding: vectors[idx]})
query_params:
id_list: '{id_batch}'
vector_list: '{train_vector_batch}'
search:
ops:
search:
read_transaction: |
WITH $query_vector AS queryVector
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
YIELD node
RETURN node.id
query_params:
query_vector: '{test_vector}'
index_name: vector_index
k: TEMPLATE(k,100)
verifier-init: |
relevancy = new io.nosqlbench.nb.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op);
for (int k in List.of(100)) {
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k));
}
verifier: |
// result is a Record[]
values = io.nosqlbench.adapter.neo4j.Neo4JAdapterUtils.getFieldForAllRecords(result, "node.id")
ann = values.collect { it.toString().toInteger() }.toArray(new Integer[values.size()])
knn = {relevant_indices}
relevancy.accept(knn, ann);
return true;

View File

@ -0,0 +1,61 @@
# neo4j driver adapter
The neo4j driver adapter is a nb adapter for the Neo4J driver, an open source Java driver for connecting to and
performing operations on an instance of a Neo4J/Aura database. The driver is hosted on github at
https://github.com/neo4j/neo4j-java-driver.
## activity parameters
The following parameters must be supplied to the adapter at runtime in order to successfully connect to an
instance of the Neo4J/Aura database:
* db_uri - the URI for the Neo4J instance for the driver to connect to.
## Op Templates
The Neo4J adapter supports three different op types:
- autocommit
- read_transaction
- write_transaction
A good reference for when to use each is located at https://neo4j.com/docs/driver-manual/1.7/sessions-transactions/
For these different op types, users can specify appropriate Cypher queries to run against the database
## Examples
All examples provided are in the scope of leveraging Neo4J's vector index capabilities. Although,
arbitrary Cypher queries can be run for most involved graph modeling use cases, only a simple
vector search functionality has been properly worked through, currently.
```yaml
ops:
example_create_vector_index:
autocommit: |
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
ON (n.embedding) OPTIONS
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
query_params:
index_name: vector_index
dimension: TEMPLATE(dimension)
similarity_function: TEMPLATE(similarity_function,cosine)
example_insert_node:
write_transaction: |
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
query_params:
id: '{id}'
vector: '{train_vector}'
example_search:
read_transaction: |
WITH $query_vector AS queryVector
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
YIELD node
RETURN node.id
query_params:
query_vector: '{test_vector}'
index_name: vector_index
k: TEMPLATE(k,100)
```

View File

@ -111,6 +111,11 @@
<artifactId>adapter-mongodb</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-neo4j</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-aws-opensearch</artifactId>
@ -331,6 +336,22 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>adapter-neo4j</id>
<activation>
<!-- <activeByDefault>true</activeByDefault>-->
<file>
<exists>../adapter-neo4j/target</exists>
</file>
</activation>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapter-neo4j</artifactId>
<version>${revision}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>adapter-tcp</id>
<activation>

View File

@ -69,6 +69,7 @@
<module>adapter-jdbc</module>
<module>adapter-milvus</module>
<module>adapter-mongodb</module>
<module>adapter-neo4j</module>
<module>adapter-aws-opensearch</module>
<module>adapter-cqld4</module>
<module>adapter-s4j</module>