removing hdf-loader module that has no relation to nb/nb5

This commit is contained in:
Mark Wolters 2023-10-06 09:43:59 -04:00
parent 39db0bb4cb
commit dab3c91362
18 changed files with 0 additions and 976 deletions

View File

@ -1,94 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2023 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>hdf-loader</artifactId>
<packaging>jar</packaging>
<parent>
<artifactId>mvn-defaults</artifactId>
<groupId>io.nosqlbench</groupId>
<version>${revision}</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<name>${project.artifactId}</name>
<dependencies>
<dependency>
<groupId>org.snakeyaml</groupId>
<artifactId>snakeyaml-engine</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.16.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.deeplearning4j/deeplearning4j-core -->
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-core</artifactId>
<version>1.0.0-M2.1</version>
</dependency>
<dependency>
<groupId>org.nd4j</groupId>
<artifactId>nd4j-native</artifactId>
<version>1.0.0-M2.1</version>
</dependency>
<dependency>
<groupId>org.deeplearning4j</groupId>
<artifactId>deeplearning4j-nlp</artifactId>
<version>1.0.0-M2.1</version>
</dependency>
<dependency>
<groupId>io.jhdf</groupId>
<artifactId>jhdf</artifactId>
<version>0.6.10</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-api</artifactId>
<version>5.17.3-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,83 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf;
import io.nosqlbench.loader.hdf.config.LoaderConfig;
import io.nosqlbench.loader.hdf.readers.Hdf5Reader;
import io.nosqlbench.loader.hdf.readers.HdfReader;
import io.nosqlbench.loader.hdf.writers.AstraVectorWriter;
import io.nosqlbench.loader.hdf.writers.FileVectorWriter;
import io.nosqlbench.loader.hdf.writers.NoopVectorWriter;
import io.nosqlbench.loader.hdf.writers.VectorWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class HdfLoader {
private static final Logger logger = LogManager.getLogger(HdfLoader.class);
public static final String FILEWRITER = "filewriter";
public static final String ASTRA = "astra";
public static final String NOOP = "noop";
public static final String HDF5 = "hdf5";
public static final String HDF4 = "hdf4";
public static void main (String[] args) {
if (args.length == 0) {
System.out.println("Usage: hdf-loader <filename>");
System.exit(1);
}
try {
LoaderConfig config = new LoaderConfig(args[0]);
logger.info("Starting loader with config: " + config);
HdfReader reader = null;
VectorWriter writer = null;
String format = config.getFormat();
switch (format.toLowerCase()) {
case HDF4 -> {
logger.info("HDF4 format not yet supported");
System.exit(1);
}
case HDF5 -> {
logger.info("HDF5 format selected");
reader = new Hdf5Reader(config);
}
default -> {
logger.info("Unknown format: " + format);
System.exit(1);
}
}
String writerType = config.getWriter();
logger.info("Using writer type: " + writerType);
switch (writerType.toLowerCase()) {
case FILEWRITER -> writer = new FileVectorWriter(config);
case ASTRA -> writer = new AstraVectorWriter(config);
case NOOP -> writer = new NoopVectorWriter();
default -> {
logger.info("Unknown writer type: " + writerType);
System.exit(1);
}
}
reader.setWriter(writer);
logger.info("Starting main read loop");
reader.read();
} catch (Exception e) {
logger.error(e);
System.exit(1);
}
}
}

View File

@ -1,84 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.config;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.yaml.snakeyaml.Yaml;
import java.io.FileReader;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class LoaderConfig {
private static final Logger logger = LogManager.getLogger(LoaderConfig.class);
private static final Yaml yaml = new Yaml();
private final Map<String, Object> configMap;
public LoaderConfig(String filePath) throws IOException {
FileReader fileReader = new FileReader(filePath);
configMap = yaml.load(fileReader);
for (Map.Entry<String, Object> entry : configMap.entrySet()) {
logger.debug(entry.getKey() + " : " + entry.getValue());
}
}
public Object getRawValue(String key) {
return configMap.get(key);
}
public String getStringValue(String key) {
return configMap.get(key).toString();
}
public List<String> getDatasets() {
return (List<String>) configMap.get("datasets");
}
public String getFormat() {
return (String) configMap.getOrDefault("format", "HD5");
}
public Map<String,String> getAstra() {
return (Map<String,String>) configMap.get("astra");
}
public String getEmbedding() {
return (String) configMap.getOrDefault("embedding", "Deeplearning4j");
}
public String getWriter() {
return (String) configMap.getOrDefault("writer", "filewriter");
}
public String getSourceFile() {
return (String) configMap.get("sourceFile");
}
public String getTargetFile() {
return (String) configMap.getOrDefault("targetFile", "./vectors.txt");
}
public int getThreads() {
return (int) configMap.getOrDefault("threads", 5);
}
public int getQueueSize() {
return (int) configMap.getOrDefault("queueSize", 1000);
}
}

View File

@ -1,62 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.embedding;
public class DoubleEmbeddingGenerator implements EmbeddingGenerator {
@Override
public float[][] generateEmbeddingFrom(Object o, int[] dims) {
return switch (dims.length) {
case 1 -> new float[][]{convertToFloat((double[]) o)};
case 2 -> convertToFloats((double[][]) o);
case 3 -> flatten(o, dims);
default -> throw new RuntimeException("unsupported embedding dimensionality: " + dims.length);
};
}
private float[][] convertToFloats(double[][] o) {
float[][] floats = new float[o.length][];
for (int i = 0; i < o.length; i++) {
floats[i] = convertToFloat(o[i]);
}
return floats;
}
public float[] convertToFloat(double[] doubleArray) {
if (doubleArray == null) {
return null;
}
float[] floatArray = new float[doubleArray.length];
for (int i = 0; i < doubleArray.length; i++) {
floatArray[i] = (float) doubleArray[i];
}
return floatArray;
}
private float[][] flatten(Object o, int[] dims) {
double[][][] arr = (double[][][]) o;
float[][] flat = new float[dims[0]][dims[1] * dims[2]];
for (int i = 0; i < dims[0]; i++) {
for (int j = 0; j < dims[1]; j++) {
for (int k = 0; k < dims[2]; k++) {
flat[i][j * dims[2] + k] = (float)arr[i][j][k];
}
}
}
return flat;
}
}

View File

@ -1,21 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.embedding;
public interface EmbeddingGenerator {
float[][] generateEmbeddingFrom(Object o, int[] dims);
}

View File

@ -1,56 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.embedding;
import java.util.HashMap;
import java.util.Map;
public class EmbeddingGeneratorFactory {
private static final Map<String,EmbeddingGenerator> generators = new HashMap<>();
public static EmbeddingGenerator getGenerator(String type) {
String typeLower = type.equalsIgnoreCase("short") ? "int" : type.toLowerCase();
if (typeLower.equals("integer")) typeLower = "int";
switch (typeLower) {
case "string" -> {
if (!generators.containsKey(type)) {
generators.put(type, new StringEmbeddingGenerator());
}
return generators.get(type);
}
case "float" -> {
if (!generators.containsKey(type)) {
generators.put(type, new FloatEmbeddingGenerator());
}
return generators.get(type);
}
case "double" -> {
if (!generators.containsKey(type)) {
generators.put(type, new DoubleEmbeddingGenerator());
}
return generators.get(type);
}
case "int" -> {
if (!generators.containsKey(type)) {
generators.put(type, new IntEmbeddingGenerator());
}
return generators.get(type);
}
default -> throw new RuntimeException("Unknown embedding type: " + type);
}
}
}

View File

@ -1,41 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.embedding;
public class FloatEmbeddingGenerator implements EmbeddingGenerator {
@Override
public float[][] generateEmbeddingFrom(Object o, int[] dims) {
return switch (dims.length) {
case 1 -> new float[][]{(float[]) o};
case 2 -> (float[][]) o;
case 3 -> flatten(o, dims);
default -> throw new RuntimeException("unsupported embedding dimensionality: " + dims.length);
};
}
private float[][] flatten(Object o, int[] dims) {
float[][][] arr = (float[][][]) o;
float[][] flat = new float[dims[0]][dims[1] * dims[2]];
for (int i = 0; i < dims[0]; i++) {
for (int j = 0; j < dims[1]; j++) {
if (dims[2] >= 0) System.arraycopy(arr[i][j], 0, flat[i], j * dims[2] + 0, dims[2]);
}
}
return flat;
}
}

View File

@ -1,59 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.embedding;
public class IntEmbeddingGenerator implements EmbeddingGenerator {
@Override
public float[][] generateEmbeddingFrom(Object o, int[] dims) {
switch (dims.length) {
case 1 -> {
float[] arr = new float[dims[0]];
for (int i = 0; i < dims[0]; i++) {
arr[i] = ((int[]) o)[i];
}
return new float[][]{arr};
}
case 2 -> {
float[][] arr = new float[dims[0]][dims[1]];
for (int i = 0; i < dims[0]; i++) {
for (int j = 0; j < dims[1]; j++) {
arr[i][j] = ((int[][]) o)[i][j];
}
}
return arr;
}
case 3 -> {
return flatten(o, dims);
}
default ->
throw new RuntimeException("unsupported embedding dimensionality: " + dims.length);
}
}
private float[][] flatten(Object o, int[] dims) {
int[][][] arr = (int[][][]) o;
float[][] flat = new float[dims[0]][dims[1] * dims[2]];
for (int i = 0; i < dims[0]; i++) {
for (int j = 0; j < dims[1]; j++) {
for (int k = 0; k < dims[2]; k++) {
flat[i][j * dims[2] + k] = arr[i][j][k];
}
}
}
return flat;
}
}

View File

@ -1,57 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.embedding;
import org.deeplearning4j.models.word2vec.Word2Vec;
import org.deeplearning4j.text.sentenceiterator.BasicLineIterator;
import org.deeplearning4j.text.sentenceiterator.CollectionSentenceIterator;
import org.deeplearning4j.text.sentenceiterator.SentenceIterator;
import org.deeplearning4j.text.tokenization.tokenizerfactory.DefaultTokenizerFactory;
import org.deeplearning4j.text.tokenization.tokenizerfactory.TokenizerFactory;
import java.util.Arrays;
import java.util.Collections;
public class StringEmbeddingGenerator implements EmbeddingGenerator {
private final TokenizerFactory tokenizerFactory= new DefaultTokenizerFactory();
@Override
public float[][] generateEmbeddingFrom(Object o, int[] dims) {
switch (dims.length) {
case 1 -> {
return generateWordEmbeddings((String[]) o);
}
default -> throw new RuntimeException("unsupported embedding dimensionality: " + dims.length);
}
}
private float[][] generateWordEmbeddings(String[] text) {
SentenceIterator iter = new CollectionSentenceIterator(Collections.singletonList(text));
/*Word2Vec vec = new Word2Vec.Builder()
.minWordFrequency(1)
.iterations(1)
.layerSize(targetDims)
.seed(42)
.windowSize(5)
.iterate(iter)
.tokenizerFactory(tokenizerFactory)
.build();
*/
return null;
}
}

View File

@ -1,147 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.readers;
import io.jhdf.HdfFile;
import io.jhdf.api.Dataset;
import io.jhdf.api.Group;
import io.jhdf.api.Node;
import io.nosqlbench.loader.hdf.config.LoaderConfig;
import io.nosqlbench.loader.hdf.embedding.EmbeddingGenerator;
import io.nosqlbench.loader.hdf.writers.VectorWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import static io.nosqlbench.loader.hdf.embedding.EmbeddingGeneratorFactory.getGenerator;
public class Hdf5Reader implements HdfReader {
private static final Logger logger = LogManager.getLogger(Hdf5Reader.class);
public static final String ALL = "all";
private VectorWriter writer;
private final LoaderConfig config;
private final ExecutorService executorService;
private final LinkedBlockingQueue<float[]> queue;
private List<String> datasets;
private final float[] SHUTDOWN = new float[0];
public Hdf5Reader(LoaderConfig config) {
this.config = config;
executorService = Executors.newCachedThreadPool();
queue = new LinkedBlockingQueue<>(config.getQueueSize());
}
@Override
public void setWriter(VectorWriter writer) {
this.writer = writer;
writer.setQueue(queue);
}
public void extractDatasets(Group parent) {
Map<String, Node> nodes = parent.getChildren();
for (String key : nodes.keySet()) {
Node node = nodes.get(key);
if (node instanceof Dataset) {
datasets.add(node.getPath());
}
else if (node.isGroup()) {
extractDatasets((Group) node);
}
}
}
@Override
public void read() {
HdfFile hdfFile = new HdfFile(Paths.get(config.getSourceFile()));
datasets = config.getDatasets();
if (datasets.get(0).equalsIgnoreCase(ALL)) {
extractDatasets(hdfFile);
}
List<Future<?>> futures = new ArrayList<>();
executorService.submit(writer);
for (String ds : datasets) {
if (ds.equalsIgnoreCase(ALL)) {
continue;
}
Future<?> future = executorService.submit(() -> {
logger.info("Processing dataset: " + ds);
Dataset dataset = hdfFile.getDatasetByPath(ds);
int[] dims = dataset.getDimensions();
String type = dataset.getJavaType().getSimpleName().toLowerCase();
EmbeddingGenerator generator = getGenerator(type);
Object data;
if (dataset.getSizeInBytes() > Integer.MAX_VALUE) {
logger.info("slicing large dataset: " + ds);
// TODO: For now this will be implemented to handle numeric types with
// 2 dimensions where the 1st dimension is the number of vectors and the 2nd
// dimension is the number of dimensions in the vector.
long[] sliceOffset = new long[dims.length];
int[] sliceDimensions = new int[dims.length];
sliceDimensions[1] = dims[1];
int noOfSlices = (int) (dataset.getSizeInBytes() / Integer.MAX_VALUE) + 1;
int sliceSize = dims[0] / noOfSlices;
for (int i = 0; i < noOfSlices; i++) {
sliceOffset[0] = (long) i * sliceSize;
sliceDimensions[0] = sliceSize;
data = dataset.getData(sliceOffset, sliceDimensions);
float[][] vectors = generator.generateEmbeddingFrom(data, dims);
for (float[] vector : vectors) {
try {
queue.put(vector);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
} else {
data = dataset.getData();
float[][] vectors = generator.generateEmbeddingFrom(data, dims);
for (float[] vector : vectors) {
try {
queue.put(vector);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
});
futures.add(future);
}
for (Future<?> future : futures) {
try {
future.get();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
hdfFile.close();
writer.shutdown();
try {
queue.put(SHUTDOWN);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
executorService.shutdown();
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.readers;
import io.nosqlbench.loader.hdf.writers.VectorWriter;
public interface HdfReader {
void setWriter(VectorWriter writer);
void read();
}

View File

@ -1,46 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.writers;
import java.util.concurrent.LinkedBlockingQueue;
public abstract class AbstractVectorWriter implements VectorWriter {
protected LinkedBlockingQueue<float[]> queue;
protected boolean shutdown = false;
public void setQueue(LinkedBlockingQueue<float[]> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!shutdown || !queue.isEmpty()) {
try {
float[] vector = queue.take();
if (vector.length==0) {
break;
}
writeVector(vector);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
protected abstract void writeVector(float[] vector);
}

View File

@ -1,68 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.writers;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.data.CqlVector;
import io.nosqlbench.loader.hdf.config.LoaderConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.file.Paths;
import java.util.Map;
public class AstraVectorWriter extends AbstractVectorWriter {
private static final Logger logger = LogManager.getLogger(AstraVectorWriter.class);
private final CqlSession session;
PreparedStatement insert_vector;
public AstraVectorWriter(LoaderConfig config) {
Map<String,String> astraParams = config.getAstra();
session = CqlSession.builder()
.withCloudSecureConnectBundle(Paths.get(astraParams.get("scb")))
.withAuthCredentials(astraParams.get("clientId"), astraParams.get("clientSecret"))
.withKeyspace(astraParams.get("keyspace"))
.build();
logger.info("Astra session initialized");
insert_vector = session.prepare(astraParams.get("query"));
}
//TODO: this is insanely slow. Needs work on threading/batching
@Override
protected void writeVector(float[] vector) {
Float[] vector2 = new Float[vector.length];
for (int i = 0; i < vector.length; i++) {
vector2[i] = vector[i];
}
CqlVector.Builder vectorBuilder = CqlVector.builder();
vectorBuilder.add(vector2);
session.execute(insert_vector.bind(getPartitionValue(vector), vectorBuilder.build()));
}
private String getPartitionValue(float[] vector) {
float sum = 0;
for (float f : vector) {
sum += f;
}
return String.valueOf(sum);
}
@Override
public void shutdown() {
shutdown = true;
}
}

View File

@ -1,56 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.writers;
import io.nosqlbench.loader.hdf.config.LoaderConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.*;
public class FileVectorWriter extends AbstractVectorWriter {
private static final Logger logger = LogManager.getLogger(FileVectorWriter.class);
private final BufferedWriter targetFile;
public FileVectorWriter(LoaderConfig config) throws IOException {
String targetFileName = config.getTargetFile();
targetFile = new BufferedWriter(new FileWriter(targetFileName));
logger.info("Writing to file: " + targetFileName);
}
@Override
protected void writeVector(float[] vector) {
try {
targetFile.write("[");
for (int i = 0; i < vector.length; i++) {
targetFile.write(String.valueOf(vector[i]));
if (i < vector.length - 1) {
targetFile.write(",");
}
}
targetFile.write("]");
targetFile.write("\n");
targetFile.flush();
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
}
@Override
public void shutdown() {
shutdown = true;
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.writers;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class NoopVectorWriter extends AbstractVectorWriter {
private static final Logger logger = LogManager.getLogger(NoopVectorWriter.class);
@Override
protected void writeVector(float[] vector) {
//No-op
logger.debug(vector);
}
@Override
public void shutdown() {
shutdown = true;
}
}

View File

@ -1,25 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.loader.hdf.writers;
import java.util.concurrent.LinkedBlockingQueue;
public interface VectorWriter extends Runnable {
void setQueue(LinkedBlockingQueue<float[]> queue);
void shutdown();
}

View File

@ -1,14 +0,0 @@
format: HDF5
sourceFile: <<insert location of source hdf5 file>>
datasets:
- all
embedding: word2vec
writer: filewriter
astra:
scb: <<insert location of Astra scb>>
clientId: <<Astra client ID>>
clientSecret: <<Astra client secret>>
keyspace: <<Name of Astra keyspace>>
query: INSERT INTO vectors25(key, value) VALUES (?,?)
targetFile: <<insert location of target file>>

View File

@ -66,8 +66,6 @@
<module.adapter-kafka>adapter-amqp</module.adapter-kafka>
<module.adapter-jdbc>adapter-jdbc</module.adapter-jdbc>
<!-- <module.hdf-loader>hdf-loader</module.hdf-loader>-->
<!-- VIRTDATA MODULES -->
<module.virtdata-api>virtdata-api</module.virtdata-api>
<module.virtdata-lang>virtdata-lang</module.virtdata-lang>
@ -114,7 +112,6 @@
<module>adapter-jdbc</module>
<module>adapter-pgvector</module>
<module>adapter-pinecone</module>
<!-- <module>hdf-loader</module>-->
<!-- VIRTDATA MODULES -->
<module>virtdata-api</module>