diff --git a/adapter-neo4j/pom.xml b/adapter-neo4j/pom.xml new file mode 100644 index 000000000..94bbdee00 --- /dev/null +++ b/adapter-neo4j/pom.xml @@ -0,0 +1,58 @@ + + + + 4.0.0 + + adapter-neo4j + jar + + + mvn-defaults + io.nosqlbench + ${revision} + ../mvn-defaults + + + ${project.artifactId} + + An nosqlbench adapter driver module for the Neo4J/Aura database. + + + + + + io.nosqlbench + nb-annotations + ${revision} + compile + + + io.nosqlbench + adapters-api + ${revision} + compile + + + org.neo4j.driver + neo4j-java-driver + 5.18.0 + + + + + diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JAdapterUtils.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JAdapterUtils.java new file mode 100644 index 000000000..ed60e8d64 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JAdapterUtils.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import org.apache.commons.lang3.StringUtils; + +import org.neo4j.driver.Record; +import org.neo4j.driver.exceptions.ClientException; + +import java.util.NoSuchElementException; + + +public class Neo4JAdapterUtils { + + /** + * Mask the digits in the given string with '*' + * + * @param unmasked The string to mask + * @return The masked string + */ + protected static String maskDigits(String unmasked) { + assert StringUtils.isNotBlank(unmasked) && StringUtils.isNotEmpty(unmasked); + int inputLength = unmasked.length(); + StringBuilder masked = new StringBuilder(inputLength); + for (char ch : unmasked.toCharArray()) { + if (Character.isDigit(ch)) { + masked.append("*"); + } else { + masked.append(ch); + } + } + return masked.toString(); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Value.html#asObject() + */ + public static Object[] getFieldForAllRecords(Record[] records, String fieldName) { + int n = records.length; + Object[] values = new Object[n]; + int idx; + for (int i = 0; i < n; i++) { + try { + idx = records[i].index(fieldName); + values[i] = records[i].get(idx).asObject(); + } + catch (NoSuchElementException e) { + throw e; + } + catch (ClientException e) { + throw e; + } + } + return values; + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapter.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapter.java new file mode 100644 index 000000000..833fb8630 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapter.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp; +import io.nosqlbench.adapters.api.activityimpl.OpMapper; +import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; +import io.nosqlbench.nb.annotations.Service; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.config.standard.NBConfigModel; +import io.nosqlbench.nb.api.config.standard.NBConfiguration; +import io.nosqlbench.nb.api.labels.NBLabels; + +import java.util.function.Function; + + +@Service(value = DriverAdapter.class, selector = "neo4j") +public class Neo4JDriverAdapter extends BaseDriverAdapter { + + public Neo4JDriverAdapter(NBComponent parentComponent, NBLabels labels) { + super(parentComponent, labels); + } + + @Override + public OpMapper getOpMapper() { + return new Neo4JOpMapper(this, getSpaceCache()); + } + + @Override + public Function getSpaceInitializer(NBConfiguration cfg) { + return (s) -> new Neo4JSpace(s, cfg); + } + + @Override + public NBConfigModel getConfigModel() { + return super.getConfigModel().add(Neo4JSpace.getConfigModel()); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapterLoader.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapterLoader.java new file mode 100644 index 000000000..53f9098b0 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JDriverAdapterLoader.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.adapter.diag.DriverAdapterLoader; +import io.nosqlbench.nb.annotations.Service; +import io.nosqlbench.nb.api.components.core.NBComponent; +import io.nosqlbench.nb.api.labels.NBLabels; + +@Service(value = DriverAdapterLoader.class, selector = "neo4j") +public class Neo4JDriverAdapterLoader implements DriverAdapterLoader { + @Override + public Neo4JDriverAdapter load(NBComponent parent, NBLabels childLabels) { + return new Neo4JDriverAdapter(parent, childLabels); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JOpMapper.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JOpMapper.java new file mode 100644 index 000000000..918d52f4d --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JOpMapper.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.adapter.neo4j.opdispensers.*; +import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp; +import io.nosqlbench.adapter.neo4j.types.Neo4JOpType; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.activityimpl.OpMapper; +import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.nosqlbench.engine.api.templating.TypeAndTarget; + +import java.util.function.LongFunction; + + +public class Neo4JOpMapper implements OpMapper { + private final DriverSpaceCache cache; + private final Neo4JDriverAdapter adapter; + + public Neo4JOpMapper(Neo4JDriverAdapter adapter, DriverSpaceCache cache) { + this.adapter = adapter; + this.cache = cache; + } + + @Override + public OpDispenser apply(ParsedOp op) { + TypeAndTarget typeAndTarget = op.getTypeAndTarget(Neo4JOpType.class, String.class); + LongFunction spaceNameFunc = op.getAsFunctionOr("space", "default"); + LongFunction spaceFunc = l -> cache.get(spaceNameFunc.apply(l)); + return switch (typeAndTarget.enumId) { + case autocommit -> new Neo4JAutoCommitOpDispenser( + adapter, op, spaceFunc, typeAndTarget.enumId.getValue() + ); + case read_transaction -> new Neo4JReadTxnOpDispenser( + adapter, op, spaceFunc, typeAndTarget.enumId.getValue() + ); + case write_transaction -> new Neo4JWriteTxnOpDispenser( + adapter, op, spaceFunc, typeAndTarget.enumId.getValue() + ); + }; + } +} + diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JSpace.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JSpace.java new file mode 100644 index 000000000..82e06a473 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/Neo4JSpace.java @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j; + +import io.nosqlbench.nb.api.config.standard.*; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Driver; +import org.neo4j.driver.GraphDatabase; + +import java.util.Optional; + +public class Neo4JSpace implements AutoCloseable { + + private final static Logger logger = LogManager.getLogger(Neo4JSpace.class); + private final String space; + private Driver driver; + + public Neo4JSpace(String space, NBConfiguration cfg) { + this.space = space; + this.driver = initializeDriver(cfg); + driver.verifyConnectivity(); + } + + private Driver initializeDriver(NBConfiguration cfg) { + String dbURI = cfg.get("db_uri"); + Optional usernameOpt = cfg.getOptional("username"); + Optional passwordOpt = cfg.getOptional("password"); + String username; + String password; + // user has supplied both username and password + if (usernameOpt.isPresent() && passwordOpt.isPresent()) { + username = usernameOpt.get(); + password = passwordOpt.get(); + logger.info(this.space + ": Creating new Neo4J driver with [" + + "dbURI = " + dbURI + + ", username = " + username + + ", password = " + Neo4JAdapterUtils.maskDigits(password) + + "]" + ); + return GraphDatabase.driver(dbURI, AuthTokens.basic(username, password)); + } + // user has only supplied username + else if (usernameOpt.isPresent()) { + String error = "username is present, but password is not defined."; + logger.error(error); + throw new RuntimeException(error); + } + // user has only supplied password + else if (passwordOpt.isPresent()) { + String error = "password is present, but username is not defined."; + logger.error(error); + throw new RuntimeException(error); + } + // user has supplied neither + else { + logger.info(this.space + ": Creating new Neo4J driver with [" + + "dbURI = " + dbURI + + "]" + ); + return GraphDatabase.driver(dbURI); + } + } + + public static NBConfigModel getConfigModel() { + return ConfigModel.of(Neo4JSpace.class) + .add(Param.required("db_uri", String.class)) + .add(Param.optional("username", String.class)) + .add(Param.optional("password", String.class)) + .asReadOnly(); + } + + public Driver getDriver() { + return driver; + } + + @Override + public void close() throws Exception { + if (driver != null){ + driver.close(); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JAutoCommitOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JAutoCommitOpDispenser.java new file mode 100644 index 000000000..f3899245e --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JAutoCommitOpDispenser.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.ops.Neo4JAutoCommitOp; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapters.api.templating.ParsedOp; + +import org.neo4j.driver.async.AsyncSession; + +import java.util.function.LongFunction; + + +public class Neo4JAutoCommitOpDispenser extends Neo4JBaseOpDispenser { + + public Neo4JAutoCommitOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op, spaceFunc, requiredTemplateKey); + } + + @Override + public LongFunction createOpFunc() { + return l -> new Neo4JAutoCommitOp( + spaceFunc.apply(l).getDriver().session(AsyncSession.class), + queryFunc.apply(l) + ); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java new file mode 100644 index 000000000..ce852c886 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JBaseOpDispenser.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.ops.Neo4JBaseOp; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser; +import io.nosqlbench.adapters.api.templating.ParsedOp; + +import org.neo4j.driver.Query; + +import java.util.Collections; +import java.util.function.LongFunction; +import java.util.Map; + + +public abstract class Neo4JBaseOpDispenser extends BaseOpDispenser { + protected final LongFunction spaceFunc; + protected final LongFunction cypherFunc; + protected final LongFunction queryFunc; + protected final LongFunction paramFunc; + protected final LongFunction opFunc; + + public Neo4JBaseOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op); + this.spaceFunc = spaceFunc; + this.cypherFunc = op.getAsRequiredFunction(requiredTemplateKey); + this.paramFunc = createParamFunc(op); + this.queryFunc = createQueryFunc(); + this.opFunc = (LongFunction) createOpFunc(); + } + + private LongFunction createParamFunc(ParsedOp op) { + return op.getAsOptionalFunction("query_params", Map.class) + .orElse(l -> Collections.emptyMap()); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Query.html#withParameters(java.util.Map) + */ + private LongFunction createQueryFunc() { + return l -> new Query(cypherFunc.apply(l)).withParameters(paramFunc.apply(l)); + } + + public abstract LongFunction createOpFunc(); + + @Override + public Neo4JBaseOp getOp(long cycle) { + return opFunc.apply(cycle); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JReadTxnOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JReadTxnOpDispenser.java new file mode 100644 index 000000000..770d148cf --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JReadTxnOpDispenser.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapter.neo4j.ops.Neo4JReadTxnOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.neo4j.driver.async.AsyncSession; + +import java.util.function.LongFunction; + + +public class Neo4JReadTxnOpDispenser extends Neo4JBaseOpDispenser { + public Neo4JReadTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op, spaceFunc, requiredTemplateKey); + } + + @Override + public LongFunction createOpFunc() { + return l -> new Neo4JReadTxnOp( + spaceFunc.apply(l).getDriver().session(AsyncSession.class), + queryFunc.apply(l) + ); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JWriteTxnOpDispenser.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JWriteTxnOpDispenser.java new file mode 100644 index 000000000..cfb5d5aea --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/opdispensers/Neo4JWriteTxnOpDispenser.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.opdispensers; + +import io.nosqlbench.adapter.neo4j.Neo4JDriverAdapter; +import io.nosqlbench.adapter.neo4j.Neo4JSpace; +import io.nosqlbench.adapter.neo4j.ops.Neo4JWriteTxnOp; +import io.nosqlbench.adapter.neo4j.types.Neo4JOpType; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.neo4j.driver.async.AsyncSession; + +import java.util.function.LongFunction; + + +public class Neo4JWriteTxnOpDispenser extends Neo4JBaseOpDispenser { + + public Neo4JWriteTxnOpDispenser(Neo4JDriverAdapter adapter, ParsedOp op, LongFunction spaceFunc, String requiredTemplateKey) { + super(adapter, op, spaceFunc, requiredTemplateKey); + } + + @Override + public LongFunction createOpFunc() { + return l -> new Neo4JWriteTxnOp( + spaceFunc.apply(l).getDriver().session(AsyncSession.class), + queryFunc.apply(l) + ); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBExecutionException.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBExecutionException.java new file mode 100644 index 000000000..f2761a02f --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBExecutionException.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import java.util.concurrent.ExecutionException; + +public class NBExecutionException extends RuntimeException { + private final ExecutionException exception; + + public NBExecutionException(ExecutionException e) { + this.exception = e; + } + + @Override + public String getMessage() { + return "Wrapped Exception: " + exception.getMessage(); + } + + @Override + public StackTraceElement[] getStackTrace() { + return exception.getStackTrace(); + } + + @Override + public synchronized Throwable getCause() { + return exception.getCause(); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBInterruptedException.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBInterruptedException.java new file mode 100644 index 000000000..ede57b670 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBInterruptedException.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +public class NBInterruptedException extends RuntimeException { + private final InterruptedException exception; + + public NBInterruptedException(InterruptedException e) { + this.exception = e; + } + + @Override + public String getMessage() { + return "Wrapped Exception: " + exception.getMessage(); + } + + @Override + public StackTraceElement[] getStackTrace() { + return exception.getStackTrace(); + } + + @Override + public synchronized Throwable getCause() { + return exception.getCause(); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBTimeoutException.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBTimeoutException.java new file mode 100644 index 000000000..a83305f0d --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/NBTimeoutException.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import java.util.concurrent.TimeoutException; + +public class NBTimeoutException extends RuntimeException { + private final TimeoutException exception; + + public NBTimeoutException(TimeoutException e) { + this.exception = e; + } + + @Override + public String getMessage() { + return "Wrapped Exception: " + exception.getMessage(); + } + + @Override + public StackTraceElement[] getStackTrace() { + return exception.getStackTrace(); + } + + @Override + public synchronized Throwable getCause() { + return exception.getCause(); + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JAutoCommitOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JAutoCommitOp.java new file mode 100644 index 000000000..7505553af --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JAutoCommitOp.java @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Neo4JAutoCommitOp extends Neo4JBaseOp { + + public Neo4JAutoCommitOp(AsyncSession session, Query query) { + super(session, query); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#runAsync(java.lang.String,java.util.Map,org.neo4j.driver.TransactionConfig) + */ + @Override + public final Record[] apply(long value) { + try { + CompletionStage> resultStage = session.runAsync(query).thenComposeAsync( + cursor -> cursor.listAsync().whenComplete( + (records, throwable) -> { + if (throwable != null) { + session.closeAsync(); + } + } + ) + ); + List recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS); + return recordList.toArray(new Record[recordList.size()]); + } catch (ExecutionException exe) { + Throwable ee = exe.getCause(); + if (ee instanceof RuntimeException re) { + throw re; + } else throw new NBExecutionException(exe); + } catch (InterruptedException ie) { + throw new NBInterruptedException(ie); + } catch (TimeoutException e) { + throw new NBTimeoutException(e); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JBaseOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JBaseOp.java new file mode 100644 index 000000000..261b5cf19 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JBaseOp.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + + +public abstract class Neo4JBaseOp implements CycleOp { + + protected final AsyncSession session; + protected final Query query; + + public Neo4JBaseOp(AsyncSession session, Query query) { + this.session = session; + this.query = query; + } + + /** + * In the child classes, this method will be responsible for: + * - using the Neo4J AsyncSession object to run the Neo4J Query + * - process the Result to get an array of Records + * - close the AsyncSession + * - Return the array of Records + * + * Session creation and closing is considered light-weight. Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/Session.html#close() + */ + public abstract Record[] apply(long value); + + @Override + public String toString() { + return "Neo4JBaseOp(" + query.toString().getClass().getSimpleName() + ")"; + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JReadTxnOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JReadTxnOp.java new file mode 100644 index 000000000..861e1d33d --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JReadTxnOp.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Neo4JReadTxnOp extends Neo4JBaseOp{ + + public Neo4JReadTxnOp(AsyncSession session, Query query) { + super(session, query); + } + + /** + * Reference: + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeReadAsync(org.neo4j.driver.async.AsyncTransactionCallback) + */ + @Override + public final Record[] apply(long value) { + try { + CompletionStage> resultStage = session.executeReadAsync( + txn -> txn.runAsync(query).thenComposeAsync( + cursor -> cursor.listAsync().whenComplete( + (records, throwable) -> { + if (throwable != null) { + session.closeAsync(); + } + } + ) + ) + ); + List recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS); + return recordList.toArray(new Record[recordList.size()]); + } catch (ExecutionException exe) { + Throwable ee = exe.getCause(); + if (ee instanceof RuntimeException re) { + throw re; + } else throw new NBExecutionException(exe); + } catch (InterruptedException ie) { + throw new NBInterruptedException(ie); + } catch (TimeoutException e) { + throw new NBTimeoutException(e); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JWriteTxnOp.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JWriteTxnOp.java new file mode 100644 index 000000000..1fec09fd6 --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/ops/Neo4JWriteTxnOp.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.ops; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.async.AsyncSession; + +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class Neo4JWriteTxnOp extends Neo4JBaseOp{ + + public Neo4JWriteTxnOp(AsyncSession session, Query query) { + super(session, query); + } + + /** + * References: + * - https://neo4j.com/docs/java-manual/current/async/ + * - https://neo4j.com/docs/api/java-driver/current/org.neo4j.driver/org/neo4j/driver/async/AsyncSession.html#executeWriteAsync(org.neo4j.driver.async.AsyncTransactionCallback) + */ + @Override + public final Record[] apply(long value) { + try { + CompletionStage> resultStage = session.executeWriteAsync( + txn -> txn.runAsync(query).thenComposeAsync( + cursor -> cursor.listAsync().whenComplete( + (records, throwable) -> { + if (throwable != null) { + session.closeAsync(); + } + } + ) + ) + ); + List recordList = resultStage.toCompletableFuture().get(300, TimeUnit.SECONDS); + return recordList.toArray(new Record[recordList.size()]); + } catch (ExecutionException exe) { + Throwable ee = exe.getCause(); + if (ee instanceof RuntimeException re) { + throw re; + } else throw new NBExecutionException(exe); + } catch (InterruptedException ie) { + throw new NBInterruptedException(ie); + } catch (TimeoutException e) { + throw new NBTimeoutException(e); + } + } +} diff --git a/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/types/Neo4JOpType.java b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/types/Neo4JOpType.java new file mode 100644 index 000000000..8c5040ade --- /dev/null +++ b/adapter-neo4j/src/main/java/io/nosqlbench/adapter/neo4j/types/Neo4JOpType.java @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.neo4j.types; + +public enum Neo4JOpType { + + autocommit("autocommit"), + + read_transaction("read_transaction"), + + write_transaction("write_transaction"); + + private final String value; + + Neo4JOpType(String value) { + this.value = value; + } + + public String getValue(){ + return value; + } +} diff --git a/adapter-neo4j/src/main/resources/activities/neo4j.yaml b/adapter-neo4j/src/main/resources/activities/neo4j.yaml new file mode 100644 index 000000000..825d8c05b --- /dev/null +++ b/adapter-neo4j/src/main/resources/activities/neo4j.yaml @@ -0,0 +1,99 @@ +min_version: 5.21.1 +description: | + Sample Neo4J Driver workload for ANN. Responsible for resetting/creating schema, ingesting vector + data from HDF5 file format, and then performing ANN queries against the ingested data + + Template Variables: + TEMPLATE(datafile) + TEMPLATE(node_label,Node) + TEMPLATE(k,100) + TEMPLATE(batch_size) + TEMPLATE(delete_batch_size,5000) + +bindings: + id: ToString() + id_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),long->ToString()); + train_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train"); + train_vector_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train")); + test_vector: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/test"); + relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datafile).hdf5", "/neighbors") + +blocks: + # TODO: Node deletion times out; attempt this in future: CREATE OR REPLACE DATABASE neo4j + reset-schema: + ops: + # Reference: https://support.neo4j.com/s/article/360059882854-Deleting-large-numbers-of-nodes#h_01H95CXNJ8TN4126T3Y01BRWKS + delete_nodes: + autocommit: | + MATCH (n) + CALL { WITH n + DETACH DELETE n + } IN TRANSACTIONS OF $delete_batch_size ROWS; + query_params: + delete_batch_size: TEMPLATE(delete_batch_size,5000) + drop_index: + autocommit: DROP INDEX $index_name IF EXISTS + query_params: + index_name: vector_index + + schema: + ops: + create_vector_index: + autocommit: | + CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node)) + ON (n.embedding) OPTIONS + {indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}} + query_params: + index_name: vector_index + dimension: TEMPLATE(dimension) + similarity_function: TEMPLATE(similarity_function,cosine) + + rampup: + ops: + insert_node: + write_transaction: | + CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector}) + query_params: + id: '{id}' + vector: '{train_vector}' + + rampup-batch: + ops: + # Reference: https://community.neo4j.com/t/unwind-multiple-arrays-to-set-property/59908/5 + insert_nodes: + write_transaction: | + WITH $id_list as ids, $vector_list as vectors + UNWIND RANGE(0, size(ids) - 1) as idx + CREATE (v:TEMPLATE(node_label,Node) {id: ids[idx], embedding: vectors[idx]}) + query_params: + id_list: '{id_batch}' + vector_list: '{train_vector_batch}' + + search: + ops: + search: + read_transaction: | + WITH $query_vector AS queryVector + CALL db.index.vector.queryNodes($index_name, $k, queryVector) + YIELD node + RETURN node.id + query_params: + query_vector: '{test_vector}' + index_name: vector_index + k: TEMPLATE(k,100) + verifier-init: | + relevancy = new io.nosqlbench.nb.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op); + for (int k in List.of(100)) { + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k)); + } + verifier: | + // result is a Record[] + values = io.nosqlbench.adapter.neo4j.Neo4JAdapterUtils.getFieldForAllRecords(result, "node.id") + ann = values.collect { it.toString().toInteger() }.toArray(new Integer[values.size()]) + knn = {relevant_indices} + relevancy.accept(knn, ann); + return true; diff --git a/adapter-neo4j/src/main/resources/neo4j.md b/adapter-neo4j/src/main/resources/neo4j.md new file mode 100644 index 000000000..0a49784a5 --- /dev/null +++ b/adapter-neo4j/src/main/resources/neo4j.md @@ -0,0 +1,61 @@ +# neo4j driver adapter + +The neo4j driver adapter is a nb adapter for the Neo4J driver, an open source Java driver for connecting to and +performing operations on an instance of a Neo4J/Aura database. The driver is hosted on github at +https://github.com/neo4j/neo4j-java-driver. + +## activity parameters + +The following parameters must be supplied to the adapter at runtime in order to successfully connect to an +instance of the Neo4J/Aura database: + +* db_uri - the URI for the Neo4J instance for the driver to connect to. + +## Op Templates + +The Neo4J adapter supports three different op types: +- autocommit +- read_transaction +- write_transaction + +A good reference for when to use each is located at https://neo4j.com/docs/driver-manual/1.7/sessions-transactions/ + +For these different op types, users can specify appropriate Cypher queries to run against the database + + +## Examples +All examples provided are in the scope of leveraging Neo4J's vector index capabilities. Although, +arbitrary Cypher queries can be run for most involved graph modeling use cases, only a simple +vector search functionality has been properly worked through, currently. + + +```yaml +ops: + example_create_vector_index: + autocommit: | + CREATE VECTOR INDEX $index_name IF NOT EXISTS FOR (n:TEMPLATE(node_label,Node)) + ON (n.embedding) OPTIONS + {indexConfig: {`vector.dimensions`: $dimension, `vector.similarity_function`: $similarity_function}} + query_params: + index_name: vector_index + dimension: TEMPLATE(dimension) + similarity_function: TEMPLATE(similarity_function,cosine) + + example_insert_node: + write_transaction: | + CREATE (v:TEMPLATE(node_label,Node) {id: $id, embedding: $vector}) + query_params: + id: '{id}' + vector: '{train_vector}' + + example_search: + read_transaction: | + WITH $query_vector AS queryVector + CALL db.index.vector.queryNodes($index_name, $k, queryVector) + YIELD node + RETURN node.id + query_params: + query_vector: '{test_vector}' + index_name: vector_index + k: TEMPLATE(k,100) +``` diff --git a/mvn-defaults/pom.xml b/mvn-defaults/pom.xml index 199da878e..a51a77576 100644 --- a/mvn-defaults/pom.xml +++ b/mvn-defaults/pom.xml @@ -362,7 +362,7 @@ org.graalvm.polyglot polyglot - 23.1.0 + 23.1.2 org.graalvm.polyglot diff --git a/nb-adapters/adapter-dynamodb/pom.xml b/nb-adapters/adapter-dynamodb/pom.xml index 766f2a061..d805e6025 100644 --- a/nb-adapters/adapter-dynamodb/pom.xml +++ b/nb-adapters/adapter-dynamodb/pom.xml @@ -43,7 +43,7 @@ com.amazonaws aws-java-sdk-dynamodb - 1.12.678 + 1.12.681 diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java index 04c320c82..87badc54a 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIO.java @@ -43,6 +43,8 @@ public class NBIO implements NBPathsAPI.Facets { private static String[] globalIncludes = new String[0]; + private static boolean useNBIOCache; + public synchronized static void addGlobalIncludes(String[] globalIncludes) { NBIO.globalIncludes = globalIncludes; } @@ -158,12 +160,25 @@ public class NBIO implements NBPathsAPI.Facets { return this; } + /** + * {@inheritDoc} + */ + @Override + public NBPathsAPI.GetPrefixes cachedContent() { + this.resolver = URIResolvers.inNBIOCache(); + return this; + } + /** * {@inheritDoc} */ @Override public NBPathsAPI.GetPrefixes allContent() { - this.resolver = URIResolvers.inFS().inCP().inURLs(); + if (useNBIOCache) { + this.resolver = URIResolvers.inFS().inCP().inNBIOCache(); + } else { + this.resolver = URIResolvers.inFS().inCP().inURLs(); + } return this; } @@ -343,6 +358,14 @@ public class NBIO implements NBPathsAPI.Facets { return new NBIO().remoteContent(); } + /** + * Return content from the NBIO cache. If the content is not in the cache look for it in the given + * URL and put it in the cache. + * + * @return this builder + */ + public static NBPathsAPI.GetPrefixes cached() { return new NBIO().cachedContent(); } + /** * {@inheritDoc} @@ -628,4 +651,13 @@ public class NBIO implements NBPathsAPI.Facets { ", extensionSets=" + extensionSets + '}'; } + + public boolean useNBIOCache() { + return useNBIOCache; + } + + public static void setUseNBIOCache(boolean wantsToUseNBIOCache) { + useNBIOCache = wantsToUseNBIOCache; + } + } diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java new file mode 100644 index 000000000..7f3e338bc --- /dev/null +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBIOResolverConditions.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.nb.api.nbio; + +public enum NBIOResolverConditions { + UPDATE_AND_VERIFY, + UPDATE_NO_VERIFY, + LOCAL_VERIFY, + LOCAL_NO_VERIFY +} diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java index d3bb76431..75e65af02 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/NBPathsAPI.java @@ -67,6 +67,14 @@ public interface NBPathsAPI { */ GetPrefixes fileContent(); + /** + * Return content from the NBIO cache. If the content is not in the cache look for it in the given + * URL and put it in the cache. + * + * @return this builder + */ + GetPrefixes cachedContent(); + /** * Return content from everywhere, from remote URls, or from the file system and then the internal * bundled content if not found in the file system first. diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java index b2d659e5a..003aaa436 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForClasspath.java @@ -101,9 +101,11 @@ public class ResolverForClasspath implements ContentResolver { public List resolveDirectory(URI uri) { List path = resolvePaths(uri); List dirs = new ArrayList<>(); - for (Path dirpath : path) { - if (Files.isDirectory(dirpath)) { - dirs.add(dirpath); + if (path != null) { + for (Path dirpath : path) { + if (Files.isDirectory(dirpath)) { + dirs.add(dirpath); + } } } return dirs; diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java new file mode 100644 index 000000000..52355e9d3 --- /dev/null +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/ResolverForNBIOCache.java @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.nb.api.nbio; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.*; +import java.net.HttpURLConnection; +import java.net.URI; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; + +public class ResolverForNBIOCache implements ContentResolver { + public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache(); + private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class); + private static String cacheDir = System.getProperty("user.home") + "/.nosqlbench/nbio-cache/"; + private static boolean forceUpdate = false; + private static boolean verifyChecksum = true; + private static int maxRetries = 3; + + @Override + public List> resolve(URI uri) { + List> contents = new ArrayList<>(); + Path path = resolvePath(uri); + + if (path != null) { + contents.add(new PathContent(path)); + } + return contents; + } + + /** + * This method is used to resolve the path of a given URI. + * It first checks if the URI has a scheme (http or https) and if it does, it tries to resolve the path from the cache. + * If the file is not in the cache, it tries to download it from the remote URL. + * If the URI does not have a scheme, it returns null. + * + * @param uri the URI to resolve the path for + * @return the resolved Path object, or null if the URI does not have a scheme or the path could not be resolved + */ + private Path resolvePath(URI uri) { + if (uri.getScheme() != null && !uri.getScheme().isEmpty() && + (uri.getScheme().equalsIgnoreCase("http") || + uri.getScheme().equalsIgnoreCase("https"))) { + Path cachedFilePath = Path.of(cacheDir + uri.getPath()); + if (Files.isReadable(cachedFilePath)) { + return pathFromLocalCache(cachedFilePath, uri); + } + else { + return pathFromRemoteUrl(uri); + } + } + return null; + } + + private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum) { + int retries = 0; + boolean success = false; + while (retries < maxRetries) { + try { + if (this.remoteFileExists(uri)) { + logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachedFilePath); + ReadableByteChannel channel = Channels.newChannel(uri.toURL().openStream()); + FileOutputStream outputStream = new FileOutputStream(cachedFilePath.toFile()); + outputStream.getChannel().transferFrom(channel, 0, Long.MAX_VALUE); + outputStream.close(); + channel.close(); + logger.info(() -> "Downloaded remote file to cache at " + cachedFilePath); + if(checksum == null || verifyChecksum(cachedFilePath, checksum)) { + success = true; + break; + } + } else { + logger.error(() -> "Error downloading remote file to cache at " + cachedFilePath + ", retrying..."); + retries++; + } + } catch (IOException e) { + logger.error(() -> "Error downloading remote file to cache at " + cachedFilePath + ", retrying..."); + retries++; + } + } + return success; + } + + private boolean verifyChecksum(Path cachedFilePath, URLContent checksum) { + try { + String localChecksumStr = generateSHA256Checksum(cachedFilePath.toString()); + Path checksumPath = checksumPath(cachedFilePath); + Files.writeString(checksumPath, localChecksumStr); + logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); + String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes())); + if (localChecksumStr.equals(remoteChecksum)) { + return true; + } else { + logger.warn(() -> "checksums do not match for " + checksumPath + " and " + checksum); + return false; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private static String stripControlCharacters(String input) { + return input.replaceAll("[\\p{Cntrl}]+$", ""); + } + + /** + * This method is used to download a file from a remote URL and store it in a local cache. + * It first creates the cache directory if it doesn't exist. + * Then it tries to download the file and if successful, it generates a SHA256 checksum for the downloaded file. + * It then compares the generated checksum with the remote checksum. + * If the checksums match, it returns the path to the cached file. + * If the checksums don't match or if there was an error during the download, it cleans up the cache and throws a RuntimeException. + * + * @param uri the URI of the remote file to download + * @return the Path to the downloaded file in the local cache + * @throws RuntimeException if there was an error during the download or if the checksums don't match + */ + private Path pathFromRemoteUrl(URI uri) { + Path cachedFilePath = Path.of(cacheDir + uri.getPath()); + createCacheDir(cachedFilePath); + if (!verifyChecksum) { + return execute(NBIOResolverConditions.UPDATE_NO_VERIFY, cachedFilePath, uri); + } + else { + return execute(NBIOResolverConditions.UPDATE_AND_VERIFY, cachedFilePath, uri); + } + } + + private void createCacheDir(Path cachedFilePath) { + Path dir = cachedFilePath.getParent(); + if (!Files.exists(dir)) { + try { + Files.createDirectories(dir); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + private void cleanupCache(Path cachedFilePath) { + if (!cachedFilePath.toFile().delete()) + logger.warn(() -> "Could not delete cached file " + cachedFilePath); + Path checksumPath = checksumPath(cachedFilePath); + if (!checksumPath.toFile().delete()) + logger.warn(() -> "Could not delete cached checksum " + checksumPath); + } + + private Path execute(NBIOResolverConditions condition, Path cachedFilePath, URI uri) { + String remoteChecksumFileStr = uri.getPath() + ".sha256"; + URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr))); + switch(condition) { + case UPDATE_AND_VERIFY: + if (checksum == null) { + logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist. Proceeding without verification"); + } + if (downloadFile(uri, cachedFilePath, checksum)) { + return cachedFilePath; + } else { + throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath); + } + case UPDATE_NO_VERIFY: + logger.warn(() -> "Checksum verification is disabled, downloading remote file to cache at " + cachedFilePath); + if (downloadFile(uri, cachedFilePath, null)) { + return cachedFilePath; + } else { + throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath); + } + case LOCAL_VERIFY: + if (checksum == null) { + logger.warn(() -> "Remote checksum file does not exist, returning cached file " + cachedFilePath); + return cachedFilePath; + } + try { + String localChecksum = Files.readString(getOrCreateChecksum(cachedFilePath)); + String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes())); + if (localChecksum.equals(remoteChecksum)) { + return cachedFilePath; + } + else { + logger.warn(() -> "Checksums do not match, rehydrating cache " + cachedFilePath); + return pathFromRemoteUrl(uri); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + case LOCAL_NO_VERIFY: + return cachedFilePath; + default: + throw new RuntimeException("Invalid NBIO Cache condition"); + } + } + + /** + * This method is used to retrieve a file from the local cache. + * It first checks if the file exists in the cache and if a checksum file is present. + * If the checksum file is not present, it generates a new one. + * If the "force update" option is enabled, it deletes the cached file and downloads it from the remote URL. + * If the "checksum verification" option is enabled, it compares the local checksum with the remote checksum. + * If the checksums match, it returns the path to the cached file. + * If the checksums don't match, it deletes the cached file and downloads it from the remote URL. + * If the remote file or checksum does not exist, it returns the cached file. + * + * @param cachedFilePath the Path to the cached file + * @param uri the URI of the remote file + * @return the Path to the cached file + * @throws RuntimeException if there was an error during the checksum comparison or if the checksums don't match + */ + private Path pathFromLocalCache(Path cachedFilePath, URI uri) { + + if (forceUpdate) { + return pathFromRemoteUrl(uri); + } + if (!verifyChecksum) { + logger.warn(() -> "Checksum verification is disabled, returning cached file " + cachedFilePath); + return execute(NBIOResolverConditions.LOCAL_NO_VERIFY, cachedFilePath, uri); + } else { + return execute(NBIOResolverConditions.LOCAL_VERIFY, cachedFilePath, uri); + } + + } + + private Path getOrCreateChecksum(Path cachedFilePath) { + Path checksumPath = checksumPath(cachedFilePath); + if (!Files.isReadable(checksumPath)) { + try { + Files.writeString(checksumPath, generateSHA256Checksum(cachedFilePath.toString())); + } catch (IOException | NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } + return checksumPath; + } + + private Path checksumPath(Path cachedFilePath) { + return Path.of(cachedFilePath + ".sha256"); + } + + private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + try (InputStream is = new FileInputStream(filePath)) { + byte[] buffer = new byte[8192]; + int bytesRead; + while ((bytesRead = is.read(buffer)) != -1) { + md.update(buffer, 0, bytesRead); + } + } + byte[] digest = md.digest(); + StringBuilder sb = new StringBuilder(); + for (byte b : digest) { + sb.append(String.format("%02x", b)); + } + return sb.toString(); + } + + private URLContent resolveURI(URI uri) { + try { + URL url = uri.toURL(); + InputStream inputStream = url.openStream(); + logger.debug(() -> "Found accessible remote file at " + url); + return new URLContent(url, inputStream); + } catch (IOException e) { + logger.error(() -> "Unable to find content at URI '" + uri + "', this often indicates a configuration error."); + return null; + } + } + + private boolean remoteFileExists(URI uri) { + try { + HttpURLConnection connection = (HttpURLConnection) uri.toURL().openConnection(); + connection.setRequestMethod("HEAD"); + int responseCode = connection.getResponseCode(); + return responseCode == HttpURLConnection.HTTP_OK; + } catch (Exception e) { + return false; // Error occurred or file does not exist + } + } + + @Override + public List resolveDirectory(URI uri) { + List dirs = new ArrayList<>(); + + Path path = Path.of(cacheDir + uri.getPath()); + if (Files.isDirectory(path)) { + dirs.add(path); + } + return dirs; + } + + public static void setCacheDir(String cacheDir) { + ResolverForNBIOCache.cacheDir = cacheDir; + } + + public static void setForceUpdate(boolean forceUpdate) { + ResolverForNBIOCache.forceUpdate = forceUpdate; + } + + public static void setVerifyChecksum(boolean verifyChecksum) { + ResolverForNBIOCache.verifyChecksum = verifyChecksum; + } + + public static void setMaxRetries(int maxRetries) { + ResolverForNBIOCache.maxRetries = maxRetries; + } + +} diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java index 4aff35b69..90e2be2af 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolver.java @@ -36,7 +36,8 @@ public class URIResolver implements ContentResolver { private static final List EVERYWHERE = List.of( ResolverForURL.INSTANCE, ResolverForFilesystem.INSTANCE, - ResolverForClasspath.INSTANCE + ResolverForClasspath.INSTANCE, + ResolverForNBIOCache.INSTANCE ); private List extensions; @@ -87,6 +88,16 @@ public class URIResolver implements ContentResolver { return this; } + /** + * Include resources within the NBIO cache or download them if they are not found. + * + * @return this URISearch + */ + public URIResolver inNBIOCache() { + loaders.add(ResolverForNBIOCache.INSTANCE); + return this; + } + public List> resolve(String uri) { return resolve(URI.create(uri)); } diff --git a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java index c1a4c852b..5c08d8eb5 100644 --- a/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java +++ b/nb-apis/nb-api/src/main/java/io/nosqlbench/nb/api/nbio/URIResolvers.java @@ -52,4 +52,8 @@ public class URIResolvers { public static URIResolver inClasspath() { return new URIResolver().inCP(); } + + public static URIResolver inNBIOCache() { + return new URIResolver().inNBIOCache(); + } } diff --git a/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java b/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java index 0561cb615..eca8b8c58 100644 --- a/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java +++ b/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java @@ -49,6 +49,7 @@ import io.nosqlbench.engine.core.logging.NBLoggerConfig; import io.nosqlbench.engine.core.metadata.MarkdownFinder; import io.nosqlbench.nb.annotations.Service; import io.nosqlbench.nb.annotations.ServiceSelector; +import io.nosqlbench.nb.api.nbio.ResolverForNBIOCache; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.config.ConfigurationFactory; @@ -220,6 +221,26 @@ public class NBCLI implements Function, NBLabeledElement { NBCLI.logger = LogManager.getLogger("NBCLI"); NBIO.addGlobalIncludes(options.wantsIncludes()); + NBIO.setUseNBIOCache(options.wantsToUseNBIOCache()); + if(options.wantsToUseNBIOCache()) { + logger.info(() -> "Configuring options for NBIO Cache"); + logger.info(() -> "Setting NBIO Cache Force Update to " + options.wantsNbioCacheForceUpdate()); + ResolverForNBIOCache.setForceUpdate(options.wantsNbioCacheForceUpdate()); + logger.info(() -> "Setting NBIO Cache Verify Checksum to " + options.wantsNbioCacheVerify()); + ResolverForNBIOCache.setVerifyChecksum(options.wantsNbioCacheVerify()); + if (options.getNbioCacheDir() != null) { + logger.info(() -> "Setting NBIO Cache directory to " + options.getNbioCacheDir()); + ResolverForNBIOCache.setCacheDir(options.getNbioCacheDir()); + } + if (options.getNbioCacheMaxRetries() != null) { + try { + ResolverForNBIOCache.setMaxRetries(Integer.parseInt(options.getNbioCacheMaxRetries())); + logger.info(() -> "Setting NBIO Cache max retries to " + options.getNbioCacheMaxRetries()); + } catch (NumberFormatException e) { + logger.error("Invalid value for nbio-cache-max-retries: " + options.getNbioCacheMaxRetries()); + } + } + } if (options.wantsBasicHelp()) { System.out.println(this.loadHelpFile("basic.md")); diff --git a/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java b/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java index 8b7d4de32..dd2970ee4 100644 --- a/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java +++ b/nb-engine/nb-engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLIOptions.java @@ -137,6 +137,11 @@ public class NBCLIOptions { private static final String DEFAULT_CONSOLE_PATTERN = "TERSE"; private static final String DEFAULT_LOGFILE_PATTERN = "VERBOSE"; private final static String ENABLE_DEDICATED_VERIFICATION_LOGGER = "--enable-dedicated-verification-logging"; + private final static String USE_NBIO_CACHE = "--use-nbio-cache"; + private final static String NBIO_CACHE_FORCE_UPDATE = "--nbio-cache-force-update"; + private final static String NBIO_CACHE_NO_VERIFY = "--nbio-cache-no-verify"; + private final static String NBIO_CACHE_DIR = "--nbio-cache-dir"; + private final static String NBIO_CACHE_MAX_RETRIES = "--nbio-cache-max-retries"; // private static final String DEFAULT_CONSOLE_LOGGING_PATTERN = "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"; @@ -206,6 +211,11 @@ public class NBCLIOptions { private String metricsLabelSpec = ""; private String wantsToCatResource = ""; private long heartbeatIntervalMs = 10000; + private boolean useNBIOCache = false; + private boolean nbioCacheForceUpdate = false; + private boolean nbioCacheVerify = true; + private String nbioCacheDir; + private String nbioCacheMaxRetries; public boolean wantsLoggedMetrics() { return this.wantsConsoleMetrics; @@ -651,6 +661,26 @@ public class NBCLIOptions { this.heartbeatIntervalMs = Long.parseLong(this.readWordOrThrow(arglist, "heartbeat interval in ms")); break; + case USE_NBIO_CACHE: + arglist.removeFirst(); + this.useNBIOCache = true; + break; + case NBIO_CACHE_FORCE_UPDATE: + arglist.removeFirst(); + this.nbioCacheForceUpdate = true; + break; + case NBIO_CACHE_NO_VERIFY: + arglist.removeFirst(); + this.nbioCacheVerify = false; + break; + case NBCLIOptions.NBIO_CACHE_DIR: + arglist.removeFirst(); + this.nbioCacheDir = this.readWordOrThrow(arglist, "a NBIO cache directory"); + break; + case NBIO_CACHE_MAX_RETRIES: + arglist.removeFirst(); + this.nbioCacheMaxRetries = this.readWordOrThrow(arglist, "the maximum number of attempts to fetch a resource from the cache"); + break; default: nonincludes.addLast(arglist.removeFirst()); } @@ -812,6 +842,21 @@ public class NBCLIOptions { public NBLogLevel getConsoleLogLevel() { return this.consoleLevel; } + public boolean wantsToUseNBIOCache() { + return this.useNBIOCache; + } + public boolean wantsNbioCacheForceUpdate() { + return nbioCacheForceUpdate; + } + public boolean wantsNbioCacheVerify() { + return nbioCacheVerify; + } + public String getNbioCacheDir() { + return nbioCacheDir; + } + public String getNbioCacheMaxRetries() { + return nbioCacheMaxRetries; + } private String readWordOrThrow(final LinkedList arglist, final String required) { if (null == arglist.peekFirst()) diff --git a/nb-virtdata/virtdata-lib-vectors/pom.xml b/nb-virtdata/virtdata-lib-vectors/pom.xml index 02707097e..a19f2f6eb 100644 --- a/nb-virtdata/virtdata-lib-vectors/pom.xml +++ b/nb-virtdata/virtdata-lib-vectors/pom.xml @@ -47,12 +47,46 @@ ${revision} - - org.apfloat - apfloat - 1.13.0 - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +