filling out reader functionality

This commit is contained in:
Mark Wolters 2023-08-01 09:24:48 -04:00
parent 654267b524
commit e5d7c7fa07
9 changed files with 69 additions and 9 deletions

View File

@ -58,6 +58,12 @@
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.16.0</version>
</dependency>
</dependencies>
</project>

View File

@ -18,13 +18,13 @@
package io.nosqlbench.loader.hdf;
import io.nosqlbench.loader.hdf.config.LoaderConfig;
import io.nosqlbench.loader.hdf.readers.HdfReaders;
import io.nosqlbench.loader.hdf.readers.HdfReaderTypes;
import io.nosqlbench.loader.hdf.readers.Hdf5Reader;
import io.nosqlbench.loader.hdf.readers.HdfReader;
import io.nosqlbench.loader.hdf.writers.AstraVectorWriter;
import io.nosqlbench.loader.hdf.writers.FileVectorWriter;
import io.nosqlbench.loader.hdf.writers.VectorWriter;
import io.nosqlbench.loader.hdf.writers.VectorWriters;
import io.nosqlbench.loader.hdf.writers.VectorWriterTypes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -44,7 +44,7 @@ public class HdfLoader {
VectorWriter writer = null;
String format = config.getFormat();
switch (HdfReaders.valueOf(format)) {
switch (HdfReaderTypes.valueOf(format)) {
case HDF4 -> {
logger.info("HDF4 format not yet supported");
System.exit(1);
@ -59,7 +59,7 @@ public class HdfLoader {
}
String writerType = config.getWriter();
switch (VectorWriters.valueOf(writerType)) {
switch (VectorWriterTypes.valueOf(writerType)) {
case filewriter -> {
writer = new FileVectorWriter(config);
}

View File

@ -74,4 +74,8 @@ public class LoaderConfig {
public int getThreads() {
return (int) configMap.getOrDefault("threads", 1);
}
public int getQueueSize() {
return (int) configMap.getOrDefault("queueSize", 1000);
}
}

View File

@ -22,6 +22,7 @@ import io.nosqlbench.loader.hdf.writers.VectorWriter;
import ncsa.hdf.hdf5lib.H5;
import ncsa.hdf.hdf5lib.HDF5Constants;
import ncsa.hdf.hdf5lib.exceptions.HDF5Exception;
import ncsa.hdf.hdf5lib.exceptions.HDF5LibraryException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -29,15 +30,18 @@ import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class Hdf5Reader implements HdfReader {
private static final Logger logger = LogManager.getLogger(Hdf5Reader.class);
private VectorWriter writer;
private final LoaderConfig config;
private final ExecutorService executorService;
private final LinkedBlockingQueue<float[]> queue;
public Hdf5Reader(LoaderConfig config) {
this.config = config;
executorService = Executors.newFixedThreadPool(config.getThreads());
queue = new LinkedBlockingQueue<>(config.getQueueSize());
}
@Override
@ -57,12 +61,31 @@ public class Hdf5Reader implements HdfReader {
int datasetId = H5.H5Dopen(fileId, dataset.get("name"));
// Get the dataspace of the dataset
int dataspaceId = H5.H5Dget_space(datasetId);
// Get the number of dimensions in the dataspace
int numDimensions = H5.H5Sget_simple_extent_ndims(dataspaceId);
float[] vector = new float[numDimensions];
long[] dims = new long[numDimensions];
} catch (HDF5LibraryException e) {
// Get the datatype of the dataset
int datatypeId = H5.H5Dget_type(datasetId);
// Get the size of each dimension
H5.H5Sget_simple_extent_dims(dataspaceId, dims, null);
// Read the data from the dataset
double[] data = new double[(int) dims[0]];
H5.H5Dread(datasetId, datatypeId, HDF5Constants.H5S_ALL, HDF5Constants.H5S_ALL,
HDF5Constants.H5P_DEFAULT, data);
// Close the dataspace, datatype, and dataset
H5.H5Sclose(dataspaceId);
H5.H5Tclose(datatypeId);
H5.H5Dclose(datasetId);
queue.put(vector);
} catch (HDF5Exception e) {
logger.error(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}

View File

@ -17,7 +17,7 @@
package io.nosqlbench.loader.hdf.readers;
public enum HdfReaders {
public enum HdfReaderTypes {
HDF4,
HDF5
}

View File

@ -0,0 +1,24 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.nosqlbench.loader.hdf.writers;
import java.util.concurrent.LinkedBlockingQueue;
public abstract class AbstractVectorWriter implements VectorWriter {
protected LinkedBlockingQueue<float[]> queue;
}

View File

@ -18,8 +18,9 @@
package io.nosqlbench.loader.hdf.writers;
import io.nosqlbench.loader.hdf.config.LoaderConfig;
import com.datastax.oss.driver.api.core.data.CqlVector;
public class AstraVectorWriter implements VectorWriter {
public class AstraVectorWriter extends AbstractVectorWriter {
public AstraVectorWriter(LoaderConfig config) {
}
}

View File

@ -17,7 +17,7 @@
package io.nosqlbench.loader.hdf.writers;
public enum VectorWriters {
public enum VectorWriterTypes {
astra,
filewriter
}

View File

@ -67,6 +67,7 @@
<module.adapter-kafka>adapter-kafka</module.adapter-kafka>
<module.adapter-kafka>adapter-amqp</module.adapter-kafka>
<module.adapter-jdbc>adapter-jdbc</module.adapter-jdbc>
<module.hdf-loader>hdf-loader</module.hdf-loader>
<!-- VIRTDATA MODULES -->
<module.virtdata-api>virtdata-api</module.virtdata-api>
@ -114,6 +115,7 @@
<module>adapter-amqp</module>
<module>adapter-jdbc</module>
<module>adapter-pinecone</module>
<module>hdf-loader</module>
<!-- VIRTDATA MODULES -->
<module>virtdata-api</module>