merge fixups

This commit is contained in:
Jonathan Shook 2024-04-11 23:48:35 -05:00
commit 99636f3186
32 changed files with 1653 additions and 12 deletions

58
adapter-neo4j/pom.xml Normal file
View File

@ -0,0 +1,58 @@
<!--
~ Copyright (c) 2024 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>adapter-neo4j</artifactId>
<packaging>jar</packaging>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>${revision}</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<name>${project.artifactId}</name>
<description>
An nosqlbench adapter driver module for the Neo4J/Aura database.
</description>
<dependencies>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>5.18.0</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,71 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import org.apache.commons.lang3.StringUtils;
import org.neo4j.driver.Record;
import org.neo4j.driver.exceptions.ClientException;
import java.util.NoSuchElementException;
public class Neo4JAdapterUtils {
/**
* Mask the digits in the given string with '*'
*
* @param unmasked The string to mask
* @return The masked string
*/
protected static String maskDigits(String unmasked) {
assert StringUtils.isNotBlank(unmasked) && StringUtils.isNotEmpty(unmasked);
int inputLength = unmasked.length();
StringBuilder masked = new StringBuilder(inputLength);
for (char ch : unmasked.toCharArray()) {
if (Character.isDigit(ch)) {
masked.append("*");
} else {
masked.append(ch);
}
}
return masked.toString();
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Value.html#asObject()
*/
public static Object[] getFieldForAllRecords(Record[] records, String fieldName) {
int n = records.length;
Object[] values = new Object[n];
int idx;
for (int i = 0; i < n; i++) {
try {
idx = records[i].index(fieldName);
values[i] = records[i].get(idx).asObject();
}
catch (NoSuchElementException e) {
throw e;
}
catch (ClientException e) {
throw e;
}
}
return values;
}
}

View File

@ -0,0 +1,53 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.config.standard.NBConfigModel;
import io.nosqlbench.nb.api.config.standard.NBConfiguration;
import io.nosqlbench.nb.api.labels.NBLabels;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "neo4j")
public class Neo4JDriverAdapter extends BaseDriverAdapter<Neo4JBaseOp, Neo4JSpace> {
public Neo4JDriverAdapter(NBComponent parentComponent, NBLabels labels) {
super(parentComponent, labels);
}
@Override
public OpMapper getOpMapper() {
return new Neo4JOpMapper(this, getSpaceCache());
}
@Override
public Function<String, ? extends Neo4JSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new Neo4JSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(Neo4JSpace.getConfigModel());
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.adapter.diag.DriverAdapterLoader;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
@Service(value = DriverAdapterLoader.class, selector = "neo4j")
public class Neo4JDriverAdapterLoader implements DriverAdapterLoader {
@Override
public Neo4JDriverAdapter load(NBComponent parent, NBLabels childLabels) {
return new Neo4JDriverAdapter(parent, childLabels);
}
}

View File

@ -0,0 +1,58 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.adapter.neo4j.opdispensers.*;
import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp;
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import java.util.function.LongFunction;
public class Neo4JOpMapper implements OpMapper<Neo4JBaseOp> {
private final DriverSpaceCache<? extends Neo4JSpace> cache;
private final Neo4JDriverAdapter adapter;
public Neo4JOpMapper(Neo4JDriverAdapter adapter, DriverSpaceCache<? extends Neo4JSpace> cache) {
this.adapter = adapter;
this.cache = cache;
}
@Override
public OpDispenser<? extends Neo4JBaseOp> apply(ParsedOp op) {
TypeAndTarget<Neo4JOpType, String> typeAndTarget = op.getTypeAndTarget(Neo4JOpType.class, String.class);
LongFunction<String> spaceNameFunc = op.getAsFunctionOr("space", "default");
LongFunction<Neo4JSpace> spaceFunc = l -> cache.get(spaceNameFunc.apply(l));
return switch (typeAndTarget.enumId) {
case autocommit -> new Neo4JAutoCommitOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case read_transaction -> new Neo4JReadTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
case write_transaction -> new Neo4JWriteTxnOpDispenser(
adapter, op, spaceFunc, typeAndTarget.enumId.getValue()
);
};
}
}

View File

@ -0,0 +1,100 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j;
import io.nosqlbench.nb.api.config.standard.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import java.util.Optional;
public class Neo4JSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(Neo4JSpace.class);
private final String space;
private Driver driver;
public Neo4JSpace(String space, NBConfiguration cfg) {
this.space = space;
this.driver = initializeDriver(cfg);
driver.verifyConnectivity();
}
private Driver initializeDriver(NBConfiguration cfg) {
String dbURI = cfg.get("db_uri");
Optional<String> usernameOpt = cfg.getOptional("username");
Optional<String> passwordOpt = cfg.getOptional("password");
String username;
String password;
// user has supplied both username and password
if (usernameOpt.isPresent() && passwordOpt.isPresent()) {
username = usernameOpt.get();
password = passwordOpt.get();
logger.info(this.space + ": Creating new Neo4J driver with [" +
"dbURI = " + dbURI +
", username = " + username +
", password = " + Neo4JAdapterUtils.maskDigits(password) +
"]"
);
return GraphDatabase.driver(dbURI, AuthTokens.basic(username, password));
}
// user has only supplied username
else if (usernameOpt.isPresent()) {
String error = "username is present, but password is not defined.";
logger.error(error);
throw new RuntimeException(error);
}
// user has only supplied password
else if (passwordOpt.isPresent()) {
String error = "password is present, but username is not defined.";
logger.error(error);
throw new RuntimeException(error);
}
// user has supplied neither
else {
logger.info(this.space + ": Creating new Neo4J driver with [" +
"dbURI = " + dbURI +
"]"
);
return GraphDatabase.driver(dbURI);
}
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(Neo4JSpace.class)
.add(Param.required("db_uri", String.class))
.add(Param.optional("username", String.class))
.add(Param.optional("password", String.class))
.asReadOnly();
}
public Driver getDriver() {
return driver;
}
@Override
public void close() throws Exception {
if (driver != null){
driver.close();
}
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JAutoCommitOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JAutoCommitOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JAutoCommitOp> createOpFunc() {
return l -> new Neo4JAutoCommitOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.Query;
import java.util.Collections;
import java.util.function.LongFunction;
import java.util.Map;
public abstract class Neo4JBaseOpDispenser extends BaseOpDispenser<Neo4JBaseOp, Neo4JSpace> {
protected final LongFunction<Neo4JSpace> spaceFunc;
protected final LongFunction<String> cypherFunc;
protected final LongFunction<Query> queryFunc;
protected final LongFunction<Map> paramFunc;
protected final LongFunction<Neo4JBaseOp> opFunc;
public Neo4JBaseOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op);
this.spaceFunc = spaceFunc;
this.cypherFunc = op.getAsRequiredFunction(requiredTemplateKey);
this.paramFunc = createParamFunc(op);
this.queryFunc = createQueryFunc();
this.opFunc = (LongFunction<Neo4JBaseOp>) createOpFunc();
}
private LongFunction<Map> createParamFunc(ParsedOp op) {
return op.getAsOptionalFunction("query_params", Map.class)
.orElse(l -> Collections.emptyMap());
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Query.html#withParameters(java.util.Map)
*/
private LongFunction<Query> createQueryFunc() {
return l -> new Query(cypherFunc.apply(l)).withParameters(paramFunc.apply(l));
}
public abstract LongFunction<? extends Neo4JBaseOp> createOpFunc();
@Override
public Neo4JBaseOp getOp(long cycle) {
return opFunc.apply(cycle);
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JReadTxnOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JReadTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JReadTxnOp> createOpFunc() {
return l -> new Neo4JReadTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.opdispensers;
import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter;
import io.nosqlbench.adapter.neo4j.Neo4JSpace;
import io.nosqlbench.adapter.neo4j.ops.Neo4JWriteTxnOp;
import io.nosqlbench.adapter.neo4j.types.Neo4JOpType;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.neo4j.driver.async.AsyncSession;
import java.util.function.LongFunction;
public class Neo4JWriteTxnOpDispenser extends Neo4JBaseOpDispenser {
public Neo4JWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction<Neo4JSpace> spaceFunc, String requiredTemplateKey) {
super(adapter, op, spaceFunc, requiredTemplateKey);
}
@Override
public LongFunction<Neo4JWriteTxnOp> createOpFunc() {
return l -> new Neo4JWriteTxnOp(
spaceFunc.apply(l).getDriver().session(AsyncSession.class),
queryFunc.apply(l)
);
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import java.util.concurrent.ExecutionException;
public class NBExecutionException extends RuntimeException {
private final ExecutionException exception;
public NBExecutionException(ExecutionException e) {
this.exception = e;
}
@Override
public String getMessage() {
return "Wrapped Exception: " + exception.getMessage();
}
@Override
public StackTraceElement[] getStackTrace() {
return exception.getStackTrace();
}
@Override
public synchronized Throwable getCause() {
return exception.getCause();
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
public class NBInterruptedException extends RuntimeException {
private final InterruptedException exception;
public NBInterruptedException(InterruptedException e) {
this.exception = e;
}
@Override
public String getMessage() {
return "Wrapped Exception: " + exception.getMessage();
}
@Override
public StackTraceElement[] getStackTrace() {
return exception.getStackTrace();
}
@Override
public synchronized Throwable getCause() {
return exception.getCause();
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import java.util.concurrent.TimeoutException;
public class NBTimeoutException extends RuntimeException {
private final TimeoutException exception;
public NBTimeoutException(TimeoutException e) {
this.exception = e;
}
@Override
public String getMessage() {
return "Wrapped Exception: " + exception.getMessage();
}
@Override
public StackTraceElement[] getStackTrace() {
return exception.getStackTrace();
}
@Override
public synchronized Throwable getCause() {
return exception.getCause();
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JAutoCommitOp extends Neo4JBaseOp {
public Neo4JAutoCommitOp(AsyncSession session, Query query) {
super(session, query);
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#runAsync(java.lang.String,java.util.Map,org.neo4j.driver.TransactionConfig)
*/
@Override
public final Record[] apply(long value) {
try {
CompletionStage<List<Record>> resultStage = session.runAsync(query).thenComposeAsync(
cursor -> cursor.listAsync().whenComplete(
(records, throwable) -> {
if (throwable != null) {
session.closeAsync();
}
}
)
);
List<Record> recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
return recordList.toArray(new Record[recordList.size()]);
} catch (ExecutionException exe) {
Throwable ee = exe.getCause();
if (ee instanceof RuntimeException re) {
throw re;
} else throw new NBExecutionException(exe);
} catch (InterruptedException ie) {
throw new NBInterruptedException(ie);
} catch (TimeoutException e) {
throw new NBTimeoutException(e);
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
public abstract class Neo4JBaseOp implements CycleOp<Record[]> {
protected final AsyncSession session;
protected final Query query;
public Neo4JBaseOp(AsyncSession session, Query query) {
this.session = session;
this.query = query;
}
/**
* In the child classes, this method will be responsible for:
* - using the Neo4J AsyncSession object to run the Neo4J Query
* - process the Result to get an array of Records
* - close the AsyncSession
* - Return the array of Records
*
* Session creation and closing is considered light-weight. Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Session.html#close()
*/
public abstract Record[] apply(long value);
@Override
public String toString() {
return "Neo4JBaseOp(" + query.toString().getClass().getSimpleName() + ")";
}
}

View File

@ -0,0 +1,66 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JReadTxnOp extends Neo4JBaseOp{
public Neo4JReadTxnOp(AsyncSession session, Query query) {
super(session, query);
}
/**
* Reference:
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback)
*/
@Override
public final Record[] apply(long value) {
try {
CompletionStage<List<Record>> resultStage = session.executeReadAsync(
txn -> txn.runAsync(query).thenComposeAsync(
cursor -> cursor.listAsync().whenComplete(
(records, throwable) -> {
if (throwable != null) {
session.closeAsync();
}
}
)
)
);
List<Record> recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
return recordList.toArray(new Record[recordList.size()]);
} catch (ExecutionException exe) {
Throwable ee = exe.getCause();
if (ee instanceof RuntimeException re) {
throw re;
} else throw new NBExecutionException(exe);
} catch (InterruptedException ie) {
throw new NBInterruptedException(ie);
} catch (TimeoutException e) {
throw new NBTimeoutException(e);
}
}
}

View File

@ -0,0 +1,67 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.ops;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.async.AsyncSession;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class Neo4JWriteTxnOp extends Neo4JBaseOp{
public Neo4JWriteTxnOp(AsyncSession session, Query query) {
super(session, query);
}
/**
* References:
* - https://neo4j.com/docs/java-manual/current/async/
* - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback)
*/
@Override
public final Record[] apply(long value) {
try {
CompletionStage<List<Record>> resultStage = session.executeWriteAsync(
txn -> txn.runAsync(query).thenComposeAsync(
cursor -> cursor.listAsync().whenComplete(
(records, throwable) -> {
if (throwable != null) {
session.closeAsync();
}
}
)
)
);
List<Record> recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS);
return recordList.toArray(new Record[recordList.size()]);
} catch (ExecutionException exe) {
Throwable ee = exe.getCause();
if (ee instanceof RuntimeException re) {
throw re;
} else throw new NBExecutionException(exe);
} catch (InterruptedException ie) {
throw new NBInterruptedException(ie);
} catch (TimeoutException e) {
throw new NBTimeoutException(e);
}
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.neo4j.types;
public enum Neo4JOpType {
autocommit("autocommit"),
read_transaction("read_transaction"),
write_transaction("write_transaction");
private final String value;
Neo4JOpType(String value) {
this.value = value;
}
public String getValue(){
return value;
}
}

View File

@ -0,0 +1,99 @@
min_version: 5.21.1
description: |
Sample Neo4J Driver workload for ANN. Responsible for resetting/creating schema, ingesting vector
data from HDF5 file format, and then performing ANN queries against the ingested data
Template Variables:
TEMPLATE(datafile)
TEMPLATE(node_label,Node)
TEMPLATE(k,100)
TEMPLATE(batch_size)
TEMPLATE(delete_batch_size,5000)
bindings:
id: ToString()
id_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),long->ToString());
train_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train");
train_vector_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train"));
test_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/test");
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datafile).hdf5", "/neighbors")
blocks:
# TODO: Node deletion times out; attempt this in future: CREATE OR REPLACE DATABASE neo4j
reset-schema:
ops:
# Reference: https://support.neo4j.com/s/article/360059882854-Deleting-large-numbers-of-nodes#h_01H95CXNJ8TN4126T3Y01BRWKS
delete_nodes:
autocommit: |
MATCH (n)
CALL { WITH n
DETACH DELETE n
} IN TRANSACTIONS OF $delete_batch_size ROWS;
query_params:
delete_batch_size: TEMPLATE(delete_batch_size,5000)
drop_index:
autocommit: DROP INDEX $index_name IF EXISTS
query_params:
index_name: vector_index
schema:
ops:
create_vector_index:
autocommit: |
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
ON (n.embedding) OPTIONS
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
query_params:
index_name: vector_index
dimension: TEMPLATE(dimension)
similarity_function: TEMPLATE(similarity_function,cosine)
rampup:
ops:
insert_node:
write_transaction: |
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
query_params:
id: '{id}'
vector: '{train_vector}'
rampup-batch:
ops:
# Reference: https://community.neo4j.com/t/unwind-multiple-arrays-to-set-property/59908/5
insert_nodes:
write_transaction: |
WITH $id_list as ids, $vector_list as vectors
UNWIND RANGE(0, size(ids) - 1) as idx
CREATE (v:TEMPLATE(node_label,Node) {id: ids[idx], embedding: vectors[idx]})
query_params:
id_list: '{id_batch}'
vector_list: '{train_vector_batch}'
search:
ops:
search:
read_transaction: |
WITH $query_vector AS queryVector
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
YIELD node
RETURN node.id
query_params:
query_vector: '{test_vector}'
index_name: vector_index
k: TEMPLATE(k,100)
verifier-init: |
relevancy = new io.nosqlbench.nb.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op);
for (int k in List.of(100)) {
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k));
}
verifier: |
// result is a Record[]
values = io.nosqlbench.adapter.neo4j.Neo4JAdapterUtils.getFieldForAllRecords(result, "node.id")
ann = values.collect { it.toString().toInteger() }.toArray(new Integer[values.size()])
knn = {relevant_indices}
relevancy.accept(knn, ann);
return true;

View File

@ -0,0 +1,61 @@
# neo4j driver adapter
The neo4j driver adapter is a nb adapter for the Neo4J driver, an open source Java driver for connecting to and
performing operations on an instance of a Neo4J/Aura database. The driver is hosted on github at
https://github.com/neo4j/neo4j-java-driver.
## activity parameters
The following parameters must be supplied to the adapter at runtime in order to successfully connect to an
instance of the Neo4J/Aura database:
* db_uri - the URI for the Neo4J instance for the driver to connect to.
## Op Templates
The Neo4J adapter supports three different op types:
- autocommit
- read_transaction
- write_transaction
A good reference for when to use each is located at https://neo4j.com/docs/driver-manual/1.7/sessions-transactions/
For these different op types, users can specify appropriate Cypher queries to run against the database
## Examples
All examples provided are in the scope of leveraging Neo4J's vector index capabilities. Although,
arbitrary Cypher queries can be run for most involved graph modeling use cases, only a simple
vector search functionality has been properly worked through, currently.
```yaml
ops:
example_create_vector_index:
autocommit: |
CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node))
ON (n.embedding) OPTIONS
{indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}}
query_params:
index_name: vector_index
dimension: TEMPLATE(dimension)
similarity_function: TEMPLATE(similarity_function,cosine)
example_insert_node:
write_transaction: |
CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector})
query_params:
id: '{id}'
vector: '{train_vector}'
example_search:
read_transaction: |
WITH $query_vector AS queryVector
CALL db.index.vector.queryNodes($index_name, $k, queryVector)
YIELD node
RETURN node.id
query_params:
query_vector: '{test_vector}'
index_name: vector_index
k: TEMPLATE(k,100)
```

View File

@ -362,7 +362,7 @@
<dependency>
<groupId>org.graalvm.polyglot</groupId>
<artifactId>polyglot</artifactId>
<version>23.1.0</version>
<version>23.1.2</version>
</dependency>
<dependency>
<groupId>org.graalvm.polyglot</groupId>

View File

@ -43,7 +43,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.12.678</version>
<version>1.12.681</version>
</dependency>
</dependencies>

View File

@ -43,6 +43,8 @@ public class NBIO implements NBPathsAPI.Facets {
private static String[] globalIncludes = new String[0];
private static boolean useNBIOCache;
public synchronized static void addGlobalIncludes(String[] globalIncludes) {
NBIO.globalIncludes = globalIncludes;
}
@ -158,12 +160,25 @@ public class NBIO implements NBPathsAPI.Facets {
return this;
}
/**
* {@inheritDoc}
*/
@Override
public NBPathsAPI.GetPrefixes cachedContent() {
this.resolver = URIResolvers.inNBIOCache();
return this;
}
/**
* {@inheritDoc}
*/
@Override
public NBPathsAPI.GetPrefixes allContent() {
this.resolver = URIResolvers.inFS().inCP().inURLs();
if (useNBIOCache) {
this.resolver = URIResolvers.inFS().inCP().inNBIOCache();
} else {
this.resolver = URIResolvers.inFS().inCP().inURLs();
}
return this;
}
@ -343,6 +358,14 @@ public class NBIO implements NBPathsAPI.Facets {
return new NBIO().remoteContent();
}
/**
* Return content from the NBIO cache. If the content is not in the cache look for it in the given
* URL and put it in the cache.
*
* @return this builder
*/
public static NBPathsAPI.GetPrefixes cached() { return new NBIO().cachedContent(); }
/**
* {@inheritDoc}
@ -628,4 +651,13 @@ public class NBIO implements NBPathsAPI.Facets {
", extensionSets=" + extensionSets +
'}';
}
public boolean useNBIOCache() {
return useNBIOCache;
}
public static void setUseNBIOCache(boolean wantsToUseNBIOCache) {
useNBIOCache = wantsToUseNBIOCache;
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.nosqlbench.nb.api.nbio;
public enum NBIOResolverConditions {
UPDATE_AND_VERIFY,
UPDATE_NO_VERIFY,
LOCAL_VERIFY,
LOCAL_NO_VERIFY
}

View File

@ -67,6 +67,14 @@ public interface NBPathsAPI {
*/
GetPrefixes fileContent();
/**
* Return content from the NBIO cache. If the content is not in the cache look for it in the given
* URL and put it in the cache.
*
* @return this builder
*/
GetPrefixes cachedContent();
/**
* Return content from everywhere, from remote URls, or from the file system and then the internal
* bundled content if not found in the file system first.

View File

@ -101,9 +101,11 @@ public class ResolverForClasspath implements ContentResolver {
public List<Path> resolveDirectory(URI uri) {
List<Path> path = resolvePaths(uri);
List<Path> dirs = new ArrayList<>();
for (Path dirpath : path) {
if (Files.isDirectory(dirpath)) {
dirs.add(dirpath);
if (path != null) {
for (Path dirpath : path) {
if (Files.isDirectory(dirpath)) {
dirs.add(dirpath);
}
}
}
return dirs;

View File

@ -0,0 +1,329 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.nosqlbench.nb.api.nbio;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
public class ResolverForNBIOCache implements ContentResolver {
public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache();
private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class);
private static String cacheDir = System.getProperty("user.home") + "/.nosqlbench/nbio-cache/";
private static boolean forceUpdate = false;
private static boolean verifyChecksum = true;
private static int maxRetries = 3;
@Override
public List<Content<?>> resolve(URI uri) {
List<Content<?>> contents = new ArrayList<>();
Path path = resolvePath(uri);
if (path != null) {
contents.add(new PathContent(path));
}
return contents;
}
/**
* This method is used to resolve the path of a given URI.
* It first checks if the URI has a scheme (http or https) and if it does, it tries to resolve the path from the cache.
* If the file is not in the cache, it tries to download it from the remote URL.
* If the URI does not have a scheme, it returns null.
*
* @param uri the URI to resolve the path for
* @return the resolved Path object, or null if the URI does not have a scheme or the path could not be resolved
*/
private Path resolvePath(URI uri) {
if (uri.getScheme() != null && !uri.getScheme().isEmpty() &&
(uri.getScheme().equalsIgnoreCase("http") ||
uri.getScheme().equalsIgnoreCase("https"))) {
Path cachedFilePath = Path.of(cacheDir + uri.getPath());
if (Files.isReadable(cachedFilePath)) {
return pathFromLocalCache(cachedFilePath, uri);
}
else {
return pathFromRemoteUrl(uri);
}
}
return null;
}
private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum) {
int retries = 0;
boolean success = false;
while (retries < maxRetries) {
try {
if (this.remoteFileExists(uri)) {
logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachedFilePath);
ReadableByteChannel channel = Channels.newChannel(uri.toURL().openStream());
FileOutputStream outputStream = new FileOutputStream(cachedFilePath.toFile());
outputStream.getChannel().transferFrom(channel, 0, Long.MAX_VALUE);
outputStream.close();
channel.close();
logger.info(() -> "Downloaded remote file to cache at " + cachedFilePath);
if(checksum == null || verifyChecksum(cachedFilePath, checksum)) {
success = true;
break;
}
} else {
logger.error(() -> "Error downloading remote file to cache at " + cachedFilePath + ", retrying...");
retries++;
}
} catch (IOException e) {
logger.error(() -> "Error downloading remote file to cache at " + cachedFilePath + ", retrying...");
retries++;
}
}
return success;
}
private boolean verifyChecksum(Path cachedFilePath, URLContent checksum) {
try {
String localChecksumStr = generateSHA256Checksum(cachedFilePath.toString());
Path checksumPath = checksumPath(cachedFilePath);
Files.writeString(checksumPath, localChecksumStr);
logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath);
String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes()));
if (localChecksumStr.equals(remoteChecksum)) {
return true;
} else {
logger.warn(() -> "checksums do not match for " + checksumPath + " and " + checksum);
return false;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static String stripControlCharacters(String input) {
return input.replaceAll("[\\p{Cntrl}]+$", "");
}
/**
* This method is used to download a file from a remote URL and store it in a local cache.
* It first creates the cache directory if it doesn't exist.
* Then it tries to download the file and if successful, it generates a SHA256 checksum for the downloaded file.
* It then compares the generated checksum with the remote checksum.
* If the checksums match, it returns the path to the cached file.
* If the checksums don't match or if there was an error during the download, it cleans up the cache and throws a RuntimeException.
*
* @param uri the URI of the remote file to download
* @return the Path to the downloaded file in the local cache
* @throws RuntimeException if there was an error during the download or if the checksums don't match
*/
private Path pathFromRemoteUrl(URI uri) {
Path cachedFilePath = Path.of(cacheDir + uri.getPath());
createCacheDir(cachedFilePath);
if (!verifyChecksum) {
return execute(NBIOResolverConditions.UPDATE_NO_VERIFY, cachedFilePath, uri);
}
else {
return execute(NBIOResolverConditions.UPDATE_AND_VERIFY, cachedFilePath, uri);
}
}
private void createCacheDir(Path cachedFilePath) {
Path dir = cachedFilePath.getParent();
if (!Files.exists(dir)) {
try {
Files.createDirectories(dir);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private void cleanupCache(Path cachedFilePath) {
if (!cachedFilePath.toFile().delete())
logger.warn(() -> "Could not delete cached file " + cachedFilePath);
Path checksumPath = checksumPath(cachedFilePath);
if (!checksumPath.toFile().delete())
logger.warn(() -> "Could not delete cached checksum " + checksumPath);
}
private Path execute(NBIOResolverConditions condition, Path cachedFilePath, URI uri) {
String remoteChecksumFileStr = uri.getPath() + ".sha256";
URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr)));
switch(condition) {
case UPDATE_AND_VERIFY:
if (checksum == null) {
logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist. Proceeding without verification");
}
if (downloadFile(uri, cachedFilePath, checksum)) {
return cachedFilePath;
} else {
throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath);
}
case UPDATE_NO_VERIFY:
logger.warn(() -> "Checksum verification is disabled, downloading remote file to cache at " + cachedFilePath);
if (downloadFile(uri, cachedFilePath, null)) {
return cachedFilePath;
} else {
throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath);
}
case LOCAL_VERIFY:
if (checksum == null) {
logger.warn(() -> "Remote checksum file does not exist, returning cached file " + cachedFilePath);
return cachedFilePath;
}
try {
String localChecksum = Files.readString(getOrCreateChecksum(cachedFilePath));
String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes()));
if (localChecksum.equals(remoteChecksum)) {
return cachedFilePath;
}
else {
logger.warn(() -> "Checksums do not match, rehydrating cache " + cachedFilePath);
return pathFromRemoteUrl(uri);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
case LOCAL_NO_VERIFY:
return cachedFilePath;
default:
throw new RuntimeException("Invalid NBIO Cache condition");
}
}
/**
* This method is used to retrieve a file from the local cache.
* It first checks if the file exists in the cache and if a checksum file is present.
* If the checksum file is not present, it generates a new one.
* If the "force update" option is enabled, it deletes the cached file and downloads it from the remote URL.
* If the "checksum verification" option is enabled, it compares the local checksum with the remote checksum.
* If the checksums match, it returns the path to the cached file.
* If the checksums don't match, it deletes the cached file and downloads it from the remote URL.
* If the remote file or checksum does not exist, it returns the cached file.
*
* @param cachedFilePath the Path to the cached file
* @param uri the URI of the remote file
* @return the Path to the cached file
* @throws RuntimeException if there was an error during the checksum comparison or if the checksums don't match
*/
private Path pathFromLocalCache(Path cachedFilePath, URI uri) {
if (forceUpdate) {
return pathFromRemoteUrl(uri);
}
if (!verifyChecksum) {
logger.warn(() -> "Checksum verification is disabled, returning cached file " + cachedFilePath);
return execute(NBIOResolverConditions.LOCAL_NO_VERIFY, cachedFilePath, uri);
} else {
return execute(NBIOResolverConditions.LOCAL_VERIFY, cachedFilePath, uri);
}
}
private Path getOrCreateChecksum(Path cachedFilePath) {
Path checksumPath = checksumPath(cachedFilePath);
if (!Files.isReadable(checksumPath)) {
try {
Files.writeString(checksumPath, generateSHA256Checksum(cachedFilePath.toString()));
} catch (IOException | NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
return checksumPath;
}
private Path checksumPath(Path cachedFilePath) {
return Path.of(cachedFilePath + ".sha256");
}
private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("SHA-256");
try (InputStream is = new FileInputStream(filePath)) {
byte[] buffer = new byte[8192];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
md.update(buffer, 0, bytesRead);
}
}
byte[] digest = md.digest();
StringBuilder sb = new StringBuilder();
for (byte b : digest) {
sb.append(String.format("%02x", b));
}
return sb.toString();
}
private URLContent resolveURI(URI uri) {
try {
URL url = uri.toURL();
InputStream inputStream = url.openStream();
logger.debug(() -> "Found accessible remote file at " + url);
return new URLContent(url, inputStream);
} catch (IOException e) {
logger.error(() -> "Unable to find content at URI '" + uri + "', this often indicates a configuration error.");
return null;
}
}
private boolean remoteFileExists(URI uri) {
try {
HttpURLConnection connection = (HttpURLConnection) uri.toURL().openConnection();
connection.setRequestMethod("HEAD");
int responseCode = connection.getResponseCode();
return responseCode == HttpURLConnection.HTTP_OK;
} catch (Exception e) {
return false; // Error occurred or file does not exist
}
}
@Override
public List<Path> resolveDirectory(URI uri) {
List<Path> dirs = new ArrayList<>();
Path path = Path.of(cacheDir + uri.getPath());
if (Files.isDirectory(path)) {
dirs.add(path);
}
return dirs;
}
public static void setCacheDir(String cacheDir) {
ResolverForNBIOCache.cacheDir = cacheDir;
}
public static void setForceUpdate(boolean forceUpdate) {
ResolverForNBIOCache.forceUpdate = forceUpdate;
}
public static void setVerifyChecksum(boolean verifyChecksum) {
ResolverForNBIOCache.verifyChecksum = verifyChecksum;
}
public static void setMaxRetries(int maxRetries) {
ResolverForNBIOCache.maxRetries = maxRetries;
}
}

View File

@ -36,7 +36,8 @@ public class URIResolver implements ContentResolver {
private static final List<ContentResolver> EVERYWHERE = List.of(
ResolverForURL.INSTANCE,
ResolverForFilesystem.INSTANCE,
ResolverForClasspath.INSTANCE
ResolverForClasspath.INSTANCE,
ResolverForNBIOCache.INSTANCE
);
private List<String> extensions;
@ -87,6 +88,16 @@ public class URIResolver implements ContentResolver {
return this;
}
/**
* Include resources within the NBIO cache or download them if they are not found.
*
* @return this URISearch
*/
public URIResolver inNBIOCache() {
loaders.add(ResolverForNBIOCache.INSTANCE);
return this;
}
public List<Content<?>> resolve(String uri) {
return resolve(URI.create(uri));
}

View File

@ -52,4 +52,8 @@ public class URIResolvers {
public static URIResolver inClasspath() {
return new URIResolver().inCP();
}
public static URIResolver inNBIOCache() {
return new URIResolver().inNBIOCache();
}
}

View File

@ -49,6 +49,7 @@ import io.nosqlbench.engine.core.logging.NBLoggerConfig;
import io.nosqlbench.engine.core.metadata.MarkdownFinder;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.nb.annotations.ServiceSelector;
import io.nosqlbench.nb.api.nbio.ResolverForNBIOCache;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.ConfigurationFactory;
@ -220,6 +221,26 @@ public class NBCLI implements Function<String[], Integer>, NBLabeledElement {
NBCLI.logger = LogManager.getLogger("NBCLI");
NBIO.addGlobalIncludes(options.wantsIncludes());
NBIO.setUseNBIOCache(options.wantsToUseNBIOCache());
if(options.wantsToUseNBIOCache()) {
logger.info(() -> "Configuring options for NBIO Cache");
logger.info(() -> "Setting NBIO Cache Force Update to " + options.wantsNbioCacheForceUpdate());
ResolverForNBIOCache.setForceUpdate(options.wantsNbioCacheForceUpdate());
logger.info(() -> "Setting NBIO Cache Verify Checksum to " + options.wantsNbioCacheVerify());
ResolverForNBIOCache.setVerifyChecksum(options.wantsNbioCacheVerify());
if (options.getNbioCacheDir() != null) {
logger.info(() -> "Setting NBIO Cache directory to " + options.getNbioCacheDir());
ResolverForNBIOCache.setCacheDir(options.getNbioCacheDir());
}
if (options.getNbioCacheMaxRetries() != null) {
try {
ResolverForNBIOCache.setMaxRetries(Integer.parseInt(options.getNbioCacheMaxRetries()));
logger.info(() -> "Setting NBIO Cache max retries to " + options.getNbioCacheMaxRetries());
} catch (NumberFormatException e) {
logger.error("Invalid value for nbio-cache-max-retries: " + options.getNbioCacheMaxRetries());
}
}
}
if (options.wantsBasicHelp()) {
System.out.println(this.loadHelpFile("basic.md"));

View File

@ -137,6 +137,11 @@ public class NBCLIOptions {
private static final String DEFAULT_CONSOLE_PATTERN = "TERSE";
private static final String DEFAULT_LOGFILE_PATTERN = "VERBOSE";
private final static String ENABLE_DEDICATED_VERIFICATION_LOGGER = "--enable-dedicated-verification-logging";
private final static String USE_NBIO_CACHE = "--use-nbio-cache";
private final static String NBIO_CACHE_FORCE_UPDATE = "--nbio-cache-force-update";
private final static String NBIO_CACHE_NO_VERIFY = "--nbio-cache-no-verify";
private final static String NBIO_CACHE_DIR = "--nbio-cache-dir";
private final static String NBIO_CACHE_MAX_RETRIES = "--nbio-cache-max-retries";
// private static final String DEFAULT_CONSOLE_LOGGING_PATTERN = "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n";
@ -206,6 +211,11 @@ public class NBCLIOptions {
private String metricsLabelSpec = "";
private String wantsToCatResource = "";
private long heartbeatIntervalMs = 10000;
private boolean useNBIOCache = false;
private boolean nbioCacheForceUpdate = false;
private boolean nbioCacheVerify = true;
private String nbioCacheDir;
private String nbioCacheMaxRetries;
public boolean wantsLoggedMetrics() {
return this.wantsConsoleMetrics;
@ -651,6 +661,26 @@ public class NBCLIOptions {
this.heartbeatIntervalMs =
Long.parseLong(this.readWordOrThrow(arglist, "heartbeat interval in ms"));
break;
case USE_NBIO_CACHE:
arglist.removeFirst();
this.useNBIOCache = true;
break;
case NBIO_CACHE_FORCE_UPDATE:
arglist.removeFirst();
this.nbioCacheForceUpdate = true;
break;
case NBIO_CACHE_NO_VERIFY:
arglist.removeFirst();
this.nbioCacheVerify = false;
break;
case NBCLIOptions.NBIO_CACHE_DIR:
arglist.removeFirst();
this.nbioCacheDir = this.readWordOrThrow(arglist, "a NBIO cache directory");
break;
case NBIO_CACHE_MAX_RETRIES:
arglist.removeFirst();
this.nbioCacheMaxRetries = this.readWordOrThrow(arglist, "the maximum number of attempts to fetch a resource from the cache");
break;
default:
nonincludes.addLast(arglist.removeFirst());
}
@ -812,6 +842,21 @@ public class NBCLIOptions {
public NBLogLevel getConsoleLogLevel() {
return this.consoleLevel;
}
public boolean wantsToUseNBIOCache() {
return this.useNBIOCache;
}
public boolean wantsNbioCacheForceUpdate() {
return nbioCacheForceUpdate;
}
public boolean wantsNbioCacheVerify() {
return nbioCacheVerify;
}
public String getNbioCacheDir() {
return nbioCacheDir;
}
public String getNbioCacheMaxRetries() {
return nbioCacheMaxRetries;
}
private String readWordOrThrow(final LinkedList<String> arglist, final String required) {
if (null == arglist.peekFirst())

View File

@ -47,12 +47,46 @@
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.apfloat</groupId>
<artifactId>apfloat</artifactId>
<version>1.13.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.matheclipse</groupId>-->
<!-- <artifactId>matheclipse-core</artifactId>-->
<!-- <version>3.1.0-SNAPSHOT</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>*</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.logging.log4j</groupId>-->
<!-- <artifactId>*</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.log4j</groupId>-->
<!-- <artifactId>*</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.matheclipse</groupId>-->
<!-- <artifactId>matheclipse-gpl</artifactId>-->
<!-- <version>3.1.0-SNAPSHOT</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>*</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.logging.log4j</groupId>-->
<!-- <artifactId>*</artifactId>-->
<!-- </exclusion>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.log4j</groupId>-->
<!-- <artifactId>*</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.matheclipse</groupId>-->
<!-- <artifactId>matheclipse-core</artifactId>-->