From aef992c3cc3c422c43109d1a099d06d5ca90c65d Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Thu, 22 Feb 2024 11:56:04 -0600 Subject: [PATCH] nosqlbench-1841 Provide a binding function which traversed directory lines deterministically --- .../from_long/to_string/DirectoryLines.java | 3 +- .../to_string/DirectoryLinesStable.java | 182 ++++++++++++++++++ .../to_string/DirectoryLinesStableTest.java | 42 ++++ 3 files changed, 226 insertions(+), 1 deletion(-) create mode 100644 virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java create mode 100644 virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java diff --git a/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLines.java b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLines.java index 0d181fd71..aed4648fc 100644 --- a/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLines.java +++ b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLines.java @@ -57,7 +57,7 @@ public class DirectoryLines implements LongFunction { this.basepath = basepath; this.namePattern = Pattern.compile(namePattern); allFiles = getAllFiles(); - if (allFiles.size() == 0) { + if (allFiles.isEmpty()) { throw new RuntimeException("Loaded zero files from " + basepath + ", full path:" + Paths.get(basepath).getFileName()); } pathIterator = allFiles.iterator(); @@ -143,5 +143,6 @@ public class DirectoryLines implements LongFunction { } } + } diff --git a/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java new file mode 100644 index 000000000..e06bf2b27 --- /dev/null +++ b/virtdata-lib-basics/src/main/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStable.java @@ -0,0 +1,182 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.virtdata.library.basics.shared.from_long.to_string; + +import io.nosqlbench.virtdata.api.annotations.Categories; +import io.nosqlbench.virtdata.api.annotations.Category; +import io.nosqlbench.virtdata.api.annotations.Example; +import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.IOException; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.*; +import java.util.function.IntFunction; +import java.util.function.LongFunction; +import java.util.regex.Pattern; + +/** + *

Read each line in each matching file in a directory structure, providing one + * line for each time this function is called. The files are sorted at the time + * the function is initialized, and each line is read in order. + *

+ *

This function accepts long input values, but they are used as ints, modulo the total number of lines known. + * This is due to historic limitations in the Java file APIs and file size support.

+ * + *

This is a variant of {@link DirectoryLines}. This version keeps a map of files and their respective cardinality, + * computed at initialization time. The content is assumed to be static during the lifetime of this function. + *

+ *

+ * The value returned for a given cycle is stable, so long as the underlying data is stable. + *

+ */ +@ThreadSafeMapper +@Categories({Category.general}) +public class DirectoryLinesStable implements LongFunction { + + private final static Logger logger = LogManager.getLogger(DirectoryLinesStable.class); + + private final Pattern namePattern; + private final String basepath; + private final List allFiles; + private int[] sizes; + private final int totalSize; + private List> fileFunctions = new ArrayList<>(); + + @Example({"DirectoryLines('/var/tmp/bardata', '.*')", "load every line from every file in /var/tmp/bardata"}) + public DirectoryLinesStable(String basepath, String namePattern) { + this.basepath = basepath; + this.namePattern = Pattern.compile(namePattern); + allFiles = getAllFiles(); + if (allFiles.isEmpty()) { + throw new RuntimeException("Loaded zero files from " + basepath + ", full path:" + Paths.get(basepath).getFileName()); + } + sizes = new int[allFiles.size()]; + int accumulator = 0; + for (int i = 0; i < allFiles.size(); i++) { + try { + List values = Files.readAllLines(allFiles.get(i)); + fileFunctions.add(values::get); + int lineCount = values.size(); + if (((long) lineCount + (long) accumulator) > Integer.MAX_VALUE) { + throw new RuntimeException("Total number of lines is beyond addressible values for int type. " + + "Reduce total lines to fix this."); + } + accumulator += lineCount; + sizes[i] = accumulator; + } catch (IOException e) { + throw new RuntimeException("Error while reading filepath '" + allFiles.get(i).toString(), e); + } + } + this.totalSize = accumulator; + } + + @Override + public synchronized String apply(long cycle) { + int value = (int) (cycle % totalSize); + int index = 0; + while (value >= sizes[index]) { + value -= sizes[index]; + index++; + } + + IntFunction func = fileFunctions.get(index); + return func.apply(value); + + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("DirectoryLinesStable{"); + sb.append("namePattern=").append(namePattern); + sb.append(", basepath='").append(basepath).append('\''); + sb.append(", allFiles=").append(allFiles); + sb.append(", sizes="); + if (sizes == null) sb.append("null"); + else { + sb.append('['); + for (int i = 0; i < sizes.length; ++i) + sb.append(i == 0 ? "" : ", ").append(sizes[i]); + sb.append(']'); + } + sb.append(", fileFunctions=").append(fileFunctions.size()); + sb.append('}'); + return sb.toString(); + } + + private List getAllFiles() { + logger.debug(() -> "Loading file paths from " + basepath); + Set options = new HashSet<>(); + options.add(FileVisitOption.FOLLOW_LINKS); + FileList fileList = new FileList(namePattern); + + try { + Files.walkFileTree(Paths.get(basepath), options, 10, fileList); + } catch (IOException e) { + throw new RuntimeException(e); + } + logger.debug(() -> "File reader: " + fileList + " in path: " + Paths.get(basepath).getFileName()); + fileList.paths.sort(Path::compareTo); + return fileList.paths; + } + + private static class FileList implements FileVisitor { + public final Pattern namePattern; + public int seen; + public int kept; + public List paths = new ArrayList<>(); + + private FileList(Pattern namePattern) { + this.namePattern = namePattern; + } + + @Override + public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + seen++; + if (file.toString().matches(namePattern.pattern())) { + paths.add(file); + kept++; + } + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException { + logger.warn("Error traversing file: " + file + ":" + exc); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + return FileVisitResult.CONTINUE; + } + + public String toString() { + return "" + kept + "/" + seen + " files with pattern '" + namePattern + "'"; + } + + } + +} + diff --git a/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java new file mode 100644 index 000000000..aebf242c0 --- /dev/null +++ b/virtdata-lib-basics/src/test/java/io/nosqlbench/virtdata/library/basics/shared/from_long/to_string/DirectoryLinesStableTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.virtdata.library.basics.shared.from_long.to_string; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class DirectoryLinesStableTest { + + @Test + public void testStableOrdering() { + DirectoryLinesStable directoryLines = new DirectoryLinesStable("./src/test/resources/static-do-not-change", ".+txt"); + assertThat(directoryLines.apply(0)).isEqualTo("data1.txt-line1"); + assertThat(directoryLines.apply(0)).isEqualTo("data1.txt-line1"); + assertThat(directoryLines.apply(4)).isEqualTo("data1.txt-line5"); + assertThat(directoryLines.apply(5)).isEqualTo("data2.txt-line1"); + assertThat(directoryLines.apply(9)).isEqualTo("data2.txt-line5"); + assertThat(directoryLines.apply(9)).isEqualTo("data2.txt-line5"); + assertThat(directoryLines.apply(10)).isEqualTo("data1.txt-line1"); + assertThat(directoryLines.apply(14)).isEqualTo("data1.txt-line5"); + assertThat(directoryLines.apply(15)).isEqualTo("data2.txt-line1"); + assertThat(directoryLines.apply(19)).isEqualTo("data2.txt-line5"); + assertThat(directoryLines.apply(Long.MAX_VALUE)).isEqualTo("data2.txt-line3"); + } + + +}