mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-01-11 16:32:01 -06:00
added resolver for nbio cache into flow, resolved download issues
This commit is contained in:
parent
dcd4b0fe72
commit
e14761bb08
@ -163,7 +163,7 @@ public class NBIO implements NBPathsAPI.Facets {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public NBPathsAPI.GetPrefixes allContent() {
|
public NBPathsAPI.GetPrefixes allContent() {
|
||||||
this.resolver = URIResolvers.inFS().inCP().inURLs();
|
this.resolver = URIResolvers.inFS().inCP().inURLs().inNBIOCache();
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -0,0 +1,21 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2024 nosqlbench
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.nb.api.nbio;
|
||||||
|
|
||||||
|
public enum NBIOResolverConditions {
|
||||||
|
}
|
@ -101,9 +101,11 @@ public class ResolverForClasspath implements ContentResolver {
|
|||||||
public List<Path> resolveDirectory(URI uri) {
|
public List<Path> resolveDirectory(URI uri) {
|
||||||
List<Path> path = resolvePaths(uri);
|
List<Path> path = resolvePaths(uri);
|
||||||
List<Path> dirs = new ArrayList<>();
|
List<Path> dirs = new ArrayList<>();
|
||||||
for (Path dirpath : path) {
|
if (path != null) {
|
||||||
if (Files.isDirectory(dirpath)) {
|
for (Path dirpath : path) {
|
||||||
dirs.add(dirpath);
|
if (Files.isDirectory(dirpath)) {
|
||||||
|
dirs.add(dirpath);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return dirs;
|
return dirs;
|
||||||
|
@ -35,14 +35,15 @@ import java.util.List;
|
|||||||
public class ResolverForNBIOCache implements ContentResolver {
|
public class ResolverForNBIOCache implements ContentResolver {
|
||||||
public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache();
|
public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache();
|
||||||
private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class);
|
private final static Logger logger = LogManager.getLogger(ResolverForNBIOCache.class);
|
||||||
|
private static final String userHomeDirectory = System.getProperty("user.home");
|
||||||
//TODO: This needs to be set somehow - envvar, yaml setting, etc.
|
//TODO: This needs to be set somehow - envvar, yaml setting, etc.
|
||||||
private static final String cache = "~/.nosqlbench/nbio-cache/";
|
private static String cache = userHomeDirectory + "/.nosqlbench/nbio-cache/";
|
||||||
//TODO: This needs to be set through configuration at runtime
|
//TODO: This needs to be set through configuration at runtime
|
||||||
private final boolean forceUpdate = false;
|
private static boolean forceUpdate = false;
|
||||||
//TODO: This needs to be set through configuration at runtime
|
//TODO: This needs to be set through configuration at runtime
|
||||||
private final boolean verifyChecksum = true;
|
private static boolean verifyChecksum = true;
|
||||||
//TODO: This needs to be set through configuration at runtime
|
//TODO: This needs to be set through configuration at runtime
|
||||||
private final int maxRetries = 3;
|
private static int maxRetries = 3;
|
||||||
@Override
|
@Override
|
||||||
public List<Content<?>> resolve(URI uri) {
|
public List<Content<?>> resolve(URI uri) {
|
||||||
List<Content<?>> contents = new ArrayList<>();
|
List<Content<?>> contents = new ArrayList<>();
|
||||||
@ -64,6 +65,7 @@ public class ResolverForNBIOCache implements ContentResolver {
|
|||||||
* TODO: Need to handle situation where file is in the cache, we want to force update but the update fails.
|
* TODO: Need to handle situation where file is in the cache, we want to force update but the update fails.
|
||||||
* In this case we don't want to delete the local file because we need to return it.
|
* In this case we don't want to delete the local file because we need to return it.
|
||||||
* Suggestion: add enum type defining behavior (force update, for update IF condition x, do not update, etc.)
|
* Suggestion: add enum type defining behavior (force update, for update IF condition x, do not update, etc.)
|
||||||
|
* See NBIOResolverConditions
|
||||||
*/
|
*/
|
||||||
if (uri.getScheme() != null && !uri.getScheme().isEmpty() &&
|
if (uri.getScheme() != null && !uri.getScheme().isEmpty() &&
|
||||||
(uri.getScheme().equalsIgnoreCase("http") ||
|
(uri.getScheme().equalsIgnoreCase("http") ||
|
||||||
@ -86,16 +88,17 @@ public class ResolverForNBIOCache implements ContentResolver {
|
|||||||
try {
|
try {
|
||||||
URLContent urlContent = resolveURI(uri);
|
URLContent urlContent = resolveURI(uri);
|
||||||
if (urlContent != null) {
|
if (urlContent != null) {
|
||||||
|
logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachePath);
|
||||||
Files.copy(urlContent.getInputStream(), cachePath);
|
Files.copy(urlContent.getInputStream(), cachePath);
|
||||||
logger.debug("Downloaded remote file to cache at " + cachePath);
|
logger.info(() -> "Downloaded remote file to cache at " + cachePath);
|
||||||
success = true;
|
success = true;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
logger.error("Error downloading remote file to cache at " + cachePath + ", retrying...");
|
logger.error(() -> "Error downloading remote file to cache at " + cachePath + ", retrying...");
|
||||||
retries++;
|
retries++;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Error downloading remote file to cache at " + cachePath + ", retrying...");
|
logger.error(() -> "Error downloading remote file to cache at " + cachePath + ", retrying...");
|
||||||
retries++;
|
retries++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -116,18 +119,19 @@ public class ResolverForNBIOCache implements ContentResolver {
|
|||||||
* 6a. If the max attempts have been exceeded throw an exception and clean up the cache
|
* 6a. If the max attempts have been exceeded throw an exception and clean up the cache
|
||||||
*/
|
*/
|
||||||
Path cachePath = Path.of(cache + uri.getPath());
|
Path cachePath = Path.of(cache + uri.getPath());
|
||||||
|
createCacheDir(cachePath);
|
||||||
if (downloadFile(uri, cachePath)) {
|
if (downloadFile(uri, cachePath)) {
|
||||||
String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256";
|
String remoteChecksumFileStr = uri.getPath().substring(0, uri.getPath().indexOf('.')) + ".sha256";
|
||||||
try {
|
try {
|
||||||
String localChecksumStr = generateSHA256Checksum(cachePath.toString());
|
String localChecksumStr = generateSHA256Checksum(cachePath.toString());
|
||||||
URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr)));
|
URLContent checksum = resolveURI(URI.create(uri.toString().replace(uri.getPath(), remoteChecksumFileStr)));
|
||||||
if (checksum == null) {
|
if (checksum == null) {
|
||||||
logger.warn("Remote checksum file does not exist");
|
logger.warn(() -> "Remote checksum file " + remoteChecksumFileStr + " does not exist");
|
||||||
return cachePath;
|
return cachePath;
|
||||||
} else {
|
} else {
|
||||||
Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256");
|
Path checksumPath = Path.of(cachePath.toString().substring(0, cachePath.toString().indexOf('.')) + ".sha256");
|
||||||
Files.writeString(checksumPath, localChecksumStr);
|
Files.writeString(checksumPath, localChecksumStr);
|
||||||
logger.debug("Generated local checksum and saved to cache at " + checksumPath);
|
logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath);
|
||||||
String remoteChecksum = new String(checksum.getInputStream().readAllBytes());
|
String remoteChecksum = new String(checksum.getInputStream().readAllBytes());
|
||||||
if (localChecksumStr.equals(remoteChecksum)) {
|
if (localChecksumStr.equals(remoteChecksum)) {
|
||||||
return cachePath;
|
return cachePath;
|
||||||
@ -148,6 +152,17 @@ public class ResolverForNBIOCache implements ContentResolver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void createCacheDir(Path cachePath) {
|
||||||
|
Path dir = cachePath.getParent();
|
||||||
|
if (!Files.exists(dir)) {
|
||||||
|
try {
|
||||||
|
Files.createDirectories(dir);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void cleanupCache() {
|
private void cleanupCache() {
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -233,10 +248,10 @@ public class ResolverForNBIOCache implements ContentResolver {
|
|||||||
try {
|
try {
|
||||||
URL url = uri.toURL();
|
URL url = uri.toURL();
|
||||||
InputStream inputStream = url.openStream();
|
InputStream inputStream = url.openStream();
|
||||||
logger.debug("Found accessible remote file at " + url);
|
logger.debug(() -> "Found accessible remote file at " + url);
|
||||||
return new URLContent(url, inputStream);
|
return new URLContent(url, inputStream);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.error("Unable to find content at URI '" + uri + "', this often indicates a configuration error.");
|
logger.error(() -> "Unable to find content at URI '" + uri + "', this often indicates a configuration error.");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -251,4 +266,21 @@ public class ResolverForNBIOCache implements ContentResolver {
|
|||||||
}
|
}
|
||||||
return dirs;
|
return dirs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void setCache(String cache) {
|
||||||
|
ResolverForNBIOCache.cache = cache;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setForceUpdate(boolean forceUpdate) {
|
||||||
|
ResolverForNBIOCache.forceUpdate = forceUpdate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setVerifyChecksum(boolean verifyChecksum) {
|
||||||
|
ResolverForNBIOCache.verifyChecksum = verifyChecksum;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setMaxRetries(int maxRetries) {
|
||||||
|
ResolverForNBIOCache.maxRetries = maxRetries;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,8 @@ public class URIResolver implements ContentResolver {
|
|||||||
private static final List<ContentResolver> EVERYWHERE = List.of(
|
private static final List<ContentResolver> EVERYWHERE = List.of(
|
||||||
ResolverForURL.INSTANCE,
|
ResolverForURL.INSTANCE,
|
||||||
ResolverForFilesystem.INSTANCE,
|
ResolverForFilesystem.INSTANCE,
|
||||||
ResolverForClasspath.INSTANCE
|
ResolverForClasspath.INSTANCE,
|
||||||
|
ResolverForNBIOCache.INSTANCE
|
||||||
);
|
);
|
||||||
|
|
||||||
private List<String> extensions;
|
private List<String> extensions;
|
||||||
@ -87,6 +88,16 @@ public class URIResolver implements ContentResolver {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Include resources within the NBIO cache or download them if they are not found.
|
||||||
|
*
|
||||||
|
* @return this URISearch
|
||||||
|
*/
|
||||||
|
public URIResolver inNBIOCache() {
|
||||||
|
loaders.add(ResolverForNBIOCache.INSTANCE);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public List<Content<?>> resolve(String uri) {
|
public List<Content<?>> resolve(String uri) {
|
||||||
return resolve(URI.create(uri));
|
return resolve(URI.create(uri));
|
||||||
}
|
}
|
||||||
|
@ -52,4 +52,8 @@ public class URIResolvers {
|
|||||||
public static URIResolver inClasspath() {
|
public static URIResolver inClasspath() {
|
||||||
return new URIResolver().inCP();
|
return new URIResolver().inCP();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static URIResolver inNBIOCache() {
|
||||||
|
return new URIResolver().inNBIOCache();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user