diff --git a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/HdfLoader.java b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/HdfLoader.java index d4c05a85d..3749e7802 100644 --- a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/HdfLoader.java +++ b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/HdfLoader.java @@ -42,6 +42,7 @@ public class HdfLoader { } try { LoaderConfig config = new LoaderConfig(args[0]); + logger.info("Starting loader with config: " + config); HdfReader reader = null; VectorWriter writer = null; @@ -51,7 +52,10 @@ public class HdfLoader { logger.info("HDF4 format not yet supported"); System.exit(1); } - case HDF5 -> reader = new Hdf5Reader(config); + case HDF5 -> { + logger.info("HDF5 format selected"); + reader = new Hdf5Reader(config); + } default -> { logger.info("Unknown format: " + format); System.exit(1); @@ -59,6 +63,7 @@ public class HdfLoader { } String writerType = config.getWriter(); + logger.info("Using writer type: " + writerType); switch (writerType.toLowerCase()) { case FILEWRITER -> writer = new FileVectorWriter(config); case ASTRA -> writer = new AstraVectorWriter(config); @@ -69,6 +74,7 @@ public class HdfLoader { } } reader.setWriter(writer); + logger.info("Starting main read loop"); reader.read(); } catch (Exception e) { logger.error(e); diff --git a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/readers/Hdf5Reader.java b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/readers/Hdf5Reader.java index bbca714c0..232207919 100644 --- a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/readers/Hdf5Reader.java +++ b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/readers/Hdf5Reader.java @@ -46,6 +46,7 @@ public class Hdf5Reader implements HdfReader { private final ExecutorService executorService; private final LinkedBlockingQueue queue; private List datasets; + private final float[] SHUTDOWN = new float[0]; public Hdf5Reader(LoaderConfig config) { this.config = config; executorService = Executors.newCachedThreadPool(); @@ -79,7 +80,7 @@ public class Hdf5Reader implements HdfReader { extractDatasets(hdfFile); } List> futures = new ArrayList<>(); - Future writerFuture = executorService.submit(writer); + executorService.submit(writer); for (String ds : datasets) { if (ds.equalsIgnoreCase(ALL)) { continue; @@ -92,6 +93,7 @@ public class Hdf5Reader implements HdfReader { EmbeddingGenerator generator = getGenerator(type); Object data; if (dataset.getSizeInBytes() > Integer.MAX_VALUE) { + logger.info("slicing large dataset: " + ds); // TODO: For now this will be implemented to handle numeric types with // 2 dimensions where the 1st dimension is the number of vectors and the 2nd // dimension is the number of dimensions in the vector. @@ -116,9 +118,7 @@ public class Hdf5Reader implements HdfReader { } else { data = dataset.getData(); float[][] vectors = generator.generateEmbeddingFrom(data, dims); - int i = 1; for (float[] vector : vectors) { - i++; try { queue.put(vector); } catch (InterruptedException e) { @@ -138,6 +138,11 @@ public class Hdf5Reader implements HdfReader { } hdfFile.close(); writer.shutdown(); + try { + queue.put(SHUTDOWN); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } executorService.shutdown(); } } diff --git a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/AstraVectorWriter.java b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/AstraVectorWriter.java index a272ad1ef..120567af8 100644 --- a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/AstraVectorWriter.java +++ b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/AstraVectorWriter.java @@ -21,12 +21,15 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.data.CqlVector; import io.nosqlbench.loader.hdf.config.LoaderConfig; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.nio.file.Paths; import java.util.Map; public class AstraVectorWriter extends AbstractVectorWriter { - private CqlSession session; + private static final Logger logger = LogManager.getLogger(AstraVectorWriter.class); + private final CqlSession session; PreparedStatement insert_vector; public AstraVectorWriter(LoaderConfig config) { @@ -36,9 +39,10 @@ public class AstraVectorWriter extends AbstractVectorWriter { .withAuthCredentials(astraParams.get("clientId"), astraParams.get("clientSecret")) .withKeyspace(astraParams.get("keyspace")) .build(); + logger.info("Astra session initialized"); insert_vector = session.prepare(astraParams.get("query")); } - +//TODO: this is insanely slow. Needs work on threading/batching @Override protected void writeVector(float[] vector) { Float[] vector2 = new Float[vector.length]; @@ -46,7 +50,7 @@ public class AstraVectorWriter extends AbstractVectorWriter { vector2[i] = vector[i]; } CqlVector.Builder vectorBuilder = CqlVector.builder(); - vectorBuilder.add((Object[]) vector2); + vectorBuilder.add(vector2); session.execute(insert_vector.bind(getPartitionValue(vector), vectorBuilder.build())); } diff --git a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/FileVectorWriter.java b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/FileVectorWriter.java index 78e327d8e..a1eacdb3f 100644 --- a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/FileVectorWriter.java +++ b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/FileVectorWriter.java @@ -29,6 +29,7 @@ public class FileVectorWriter extends AbstractVectorWriter { public FileVectorWriter(LoaderConfig config) throws IOException { String targetFileName = config.getTargetFile(); targetFile = new BufferedWriter(new FileWriter(targetFileName)); + logger.info("Writing to file: " + targetFileName); } @Override diff --git a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/NoopVectorWriter.java b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/NoopVectorWriter.java index afb0789f4..15e62f067 100644 --- a/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/NoopVectorWriter.java +++ b/hdf-loader/src/main/java/io/nosqlbench/loader/hdf/writers/NoopVectorWriter.java @@ -17,13 +17,16 @@ package io.nosqlbench.loader.hdf.writers; -import java.util.concurrent.LinkedBlockingQueue; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class NoopVectorWriter extends AbstractVectorWriter { + private static final Logger logger = LogManager.getLogger(NoopVectorWriter.class); @Override protected void writeVector(float[] vector) { //No-op + logger.debug(vector); } @Override diff --git a/hdf-loader/src/main/resources/config.yaml b/hdf-loader/src/main/resources/config.yaml index c8e2e1ea5..d7c1c9dfd 100644 --- a/hdf-loader/src/main/resources/config.yaml +++ b/hdf-loader/src/main/resources/config.yaml @@ -3,11 +3,11 @@ sourceFile: /home/mwolters138/Documents/hdf5/datasets/pass/glove-25-angular.hdf5 datasets: - all embedding: word2vec -writer: noop +writer: filewriter astra: scb: /home/mwolters138/Dev/testing/secure-connect-vector-correctness.zip clientId: IvpdaZejwNuvWeupsIkWTHeL clientSecret: .bxut2-OQL,dWunZeQbjZC0vMHd88UWXKS.xT,nl95zQC0B0xU9FzSWK3HSUGO11o_7pr7wG7+EMaZqegkKlr4fZ54__furPMtWPGiPp,2cZ1q15vrWwc9_-AcgeCbuf - keyspace: baselines768dot - query: INSERT INTO vectors(key, value) VALUES (?,?) + keyspace: baselines128dot + query: INSERT INTO vectors25(key, value) VALUES (?,?) targetFile: /home/mwolters138/vectors.txt diff --git a/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_vector/HdfFileToVectorTest.java b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_vector/HdfFileToVectorTest.java new file mode 100644 index 000000000..28dcfbc30 --- /dev/null +++ b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_vector/HdfFileToVectorTest.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2023 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.virtdata.library.basics.shared.from_long.to_vector; + +public class HdfFileToVectorTest { +}