code cleanup and fixed issue with executable not exiting

This commit is contained in:
Mark Wolters
2023-08-15 10:08:33 -04:00
parent 7109c4d09f
commit 2502a970b6
7 changed files with 51 additions and 11 deletions

View File

@@ -42,6 +42,7 @@ public class HdfLoader {
}
try {
LoaderConfig config = new LoaderConfig(args[0]);
logger.info("Starting loader with config: " + config);
HdfReader reader = null;
VectorWriter writer = null;
@@ -51,7 +52,10 @@ public class HdfLoader {
logger.info("HDF4 format not yet supported");
System.exit(1);
}
case HDF5 -> reader = new Hdf5Reader(config);
case HDF5 -> {
logger.info("HDF5 format selected");
reader = new Hdf5Reader(config);
}
default -> {
logger.info("Unknown format: " + format);
System.exit(1);
@@ -59,6 +63,7 @@ public class HdfLoader {
}
String writerType = config.getWriter();
logger.info("Using writer type: " + writerType);
switch (writerType.toLowerCase()) {
case FILEWRITER -> writer = new FileVectorWriter(config);
case ASTRA -> writer = new AstraVectorWriter(config);
@@ -69,6 +74,7 @@ public class HdfLoader {
}
}
reader.setWriter(writer);
logger.info("Starting main read loop");
reader.read();
} catch (Exception e) {
logger.error(e);

View File

@@ -46,6 +46,7 @@ public class Hdf5Reader implements HdfReader {
private final ExecutorService executorService;
private final LinkedBlockingQueue<float[]> queue;
private List<String> datasets;
private final float[] SHUTDOWN = new float[0];
public Hdf5Reader(LoaderConfig config) {
this.config = config;
executorService = Executors.newCachedThreadPool();
@@ -79,7 +80,7 @@ public class Hdf5Reader implements HdfReader {
extractDatasets(hdfFile);
}
List<Future<?>> futures = new ArrayList<>();
Future<?> writerFuture = executorService.submit(writer);
executorService.submit(writer);
for (String ds : datasets) {
if (ds.equalsIgnoreCase(ALL)) {
continue;
@@ -92,6 +93,7 @@ public class Hdf5Reader implements HdfReader {
EmbeddingGenerator generator = getGenerator(type);
Object data;
if (dataset.getSizeInBytes() > Integer.MAX_VALUE) {
logger.info("slicing large dataset: " + ds);
// TODO: For now this will be implemented to handle numeric types with
// 2 dimensions where the 1st dimension is the number of vectors and the 2nd
// dimension is the number of dimensions in the vector.
@@ -116,9 +118,7 @@ public class Hdf5Reader implements HdfReader {
} else {
data = dataset.getData();
float[][] vectors = generator.generateEmbeddingFrom(data, dims);
int i = 1;
for (float[] vector : vectors) {
i++;
try {
queue.put(vector);
} catch (InterruptedException e) {
@@ -138,6 +138,11 @@ public class Hdf5Reader implements HdfReader {
}
hdfFile.close();
writer.shutdown();
try {
queue.put(SHUTDOWN);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
executorService.shutdown();
}
}

View File

@@ -21,12 +21,15 @@ import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.data.CqlVector;
import io.nosqlbench.loader.hdf.config.LoaderConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Paths;
import java.util.Map;
public class AstraVectorWriter extends AbstractVectorWriter {
private CqlSession session;
private static final Logger logger = LogManager.getLogger(AstraVectorWriter.class);
private final CqlSession session;
PreparedStatement insert_vector;
public AstraVectorWriter(LoaderConfig config) {
@@ -36,9 +39,10 @@ public class AstraVectorWriter extends AbstractVectorWriter {
.withAuthCredentials(astraParams.get("clientId"), astraParams.get("clientSecret"))
.withKeyspace(astraParams.get("keyspace"))
.build();
logger.info("Astra session initialized");
insert_vector = session.prepare(astraParams.get("query"));
}
//TODO: this is insanely slow. Needs work on threading/batching
@Override
protected void writeVector(float[] vector) {
Float[] vector2 = new Float[vector.length];
@@ -46,7 +50,7 @@ public class AstraVectorWriter extends AbstractVectorWriter {
vector2[i] = vector[i];
}
CqlVector.Builder vectorBuilder = CqlVector.builder();
vectorBuilder.add((Object[]) vector2);
vectorBuilder.add(vector2);
session.execute(insert_vector.bind(getPartitionValue(vector), vectorBuilder.build()));
}

View File

@@ -29,6 +29,7 @@ public class FileVectorWriter extends AbstractVectorWriter {
public FileVectorWriter(LoaderConfig config) throws IOException {
String targetFileName = config.getTargetFile();
targetFile = new BufferedWriter(new FileWriter(targetFileName));
logger.info("Writing to file: " + targetFileName);
}
@Override

View File

@@ -17,13 +17,16 @@
package io.nosqlbench.loader.hdf.writers;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NoopVectorWriter extends AbstractVectorWriter {
private static final Logger logger = LogManager.getLogger(NoopVectorWriter.class);
@Override
protected void writeVector(float[] vector) {
//No-op
logger.debug(vector);
}
@Override

View File

@@ -3,11 +3,11 @@ sourceFile: /home/mwolters138/Documents/hdf5/datasets/pass/glove-25-angular.hdf5
datasets:
- all
embedding: word2vec
writer: noop
writer: filewriter
astra:
scb: /home/mwolters138/Dev/testing/secure-connect-vector-correctness.zip
clientId: IvpdaZejwNuvWeupsIkWTHeL
clientSecret: .bxut2-OQL,dWunZeQbjZC0vMHd88UWXKS.xT,nl95zQC0B0xU9FzSWK3HSUGO11o_7pr7wG7+EMaZqegkKlr4fZ54__furPMtWPGiPp,2cZ1q15vrWwc9_-AcgeCbuf
keyspace: baselines768dot
query: INSERT INTO vectors(key, value) VALUES (?,?)
keyspace: baselines128dot
query: INSERT INTO vectors25(key, value) VALUES (?,?)
targetFile: /home/mwolters138/vectors.txt

View File

@@ -0,0 +1,21 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.nosqlbench.virtdata.library.basics.shared.from_long.to_vector;
public class HdfFileToVectorTest {
}