Merge pull request #1925 from nosqlbench/mwolters/nbiocache_enhancements

progress monitor
This commit is contained in:
Mark Wolters 2024-04-12 15:16:21 -04:00 committed by GitHub
commit efc82f03f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -24,7 +24,9 @@ import java.io.*;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel; import java.nio.channels.ReadableByteChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
@ -32,6 +34,8 @@ import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
public class ResolverForNBIOCache implements ContentResolver { public class ResolverForNBIOCache implements ContentResolver {
public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache(); public static final ResolverForNBIOCache INSTANCE = new ResolverForNBIOCache();
@ -76,6 +80,26 @@ public class ResolverForNBIOCache implements ContentResolver {
return null; return null;
} }
private static class ProgressPrinter extends TimerTask {
private final long fileSize;
private long totalBytesRead;
public ProgressPrinter(long fileSize, long totalBytesRead) {
this.fileSize = fileSize;
this.totalBytesRead = totalBytesRead;
}
public void update(long totalBytesRead) {
this.totalBytesRead = totalBytesRead;
}
@Override
public void run() {
double progress = (double) totalBytesRead / fileSize * 100;
logger.info(() -> "Progress: " + String.format("%.2f", progress) + "% completed");
}
}
private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum) { private boolean downloadFile(URI uri, Path cachedFilePath, URLContent checksum) {
int retries = 0; int retries = 0;
boolean success = false; boolean success = false;
@ -85,7 +109,20 @@ public class ResolverForNBIOCache implements ContentResolver {
logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachedFilePath); logger.info(() -> "Downloading remote file " + uri + " to cache at " + cachedFilePath);
ReadableByteChannel channel = Channels.newChannel(uri.toURL().openStream()); ReadableByteChannel channel = Channels.newChannel(uri.toURL().openStream());
FileOutputStream outputStream = new FileOutputStream(cachedFilePath.toFile()); FileOutputStream outputStream = new FileOutputStream(cachedFilePath.toFile());
outputStream.getChannel().transferFrom(channel, 0, Long.MAX_VALUE); long fileSize = uri.toURL().openConnection().getContentLengthLong();
long totalBytesRead = 0;
FileChannel fileChannel = outputStream.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(32768);
Timer timer = new Timer();
ProgressPrinter printer = new ProgressPrinter(fileSize, 0);
timer.scheduleAtFixedRate(printer, 2000, 2000);
while (channel.read(buffer) != -1) {
buffer.flip();
totalBytesRead += fileChannel.write(buffer);
printer.update(totalBytesRead);
buffer.clear();
}
outputStream.close(); outputStream.close();
channel.close(); channel.close();
logger.info(() -> "Downloaded remote file to cache at " + cachedFilePath); logger.info(() -> "Downloaded remote file to cache at " + cachedFilePath);
@ -110,9 +147,10 @@ public class ResolverForNBIOCache implements ContentResolver {
String localChecksumStr = generateSHA256Checksum(cachedFilePath.toString()); String localChecksumStr = generateSHA256Checksum(cachedFilePath.toString());
Path checksumPath = checksumPath(cachedFilePath); Path checksumPath = checksumPath(cachedFilePath);
Files.writeString(checksumPath, localChecksumStr); Files.writeString(checksumPath, localChecksumStr);
logger.debug(() -> "Generated local checksum and saved to cache at " + checksumPath); logger.info(() -> "Generated local checksum and saved to cache at " + checksumPath);
String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes())); String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes()));
if (localChecksumStr.equals(remoteChecksum)) { if (localChecksumStr.equals(remoteChecksum)) {
logger.info(() -> "Checksums match for " + checksumPath + " and " + checksum);
return true; return true;
} else { } else {
logger.warn(() -> "checksums do not match for " + checksumPath + " and " + checksum); logger.warn(() -> "checksums do not match for " + checksumPath + " and " + checksum);
@ -162,6 +200,7 @@ public class ResolverForNBIOCache implements ContentResolver {
} }
private void cleanupCache(Path cachedFilePath) { private void cleanupCache(Path cachedFilePath) {
logger.info(() -> "Cleaning up cache for " + cachedFilePath);
if (!cachedFilePath.toFile().delete()) if (!cachedFilePath.toFile().delete())
logger.warn(() -> "Could not delete cached file " + cachedFilePath); logger.warn(() -> "Could not delete cached file " + cachedFilePath);
Path checksumPath = checksumPath(cachedFilePath); Path checksumPath = checksumPath(cachedFilePath);
@ -180,6 +219,7 @@ public class ResolverForNBIOCache implements ContentResolver {
if (downloadFile(uri, cachedFilePath, checksum)) { if (downloadFile(uri, cachedFilePath, checksum)) {
return cachedFilePath; return cachedFilePath;
} else { } else {
cleanupCache(cachedFilePath);
throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath); throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath);
} }
case UPDATE_NO_VERIFY: case UPDATE_NO_VERIFY:
@ -187,6 +227,7 @@ public class ResolverForNBIOCache implements ContentResolver {
if (downloadFile(uri, cachedFilePath, null)) { if (downloadFile(uri, cachedFilePath, null)) {
return cachedFilePath; return cachedFilePath;
} else { } else {
cleanupCache(cachedFilePath);
throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath); throw new RuntimeException("Error downloading remote file to cache at " + cachedFilePath);
} }
case LOCAL_VERIFY: case LOCAL_VERIFY:
@ -198,6 +239,7 @@ public class ResolverForNBIOCache implements ContentResolver {
String localChecksum = Files.readString(getOrCreateChecksum(cachedFilePath)); String localChecksum = Files.readString(getOrCreateChecksum(cachedFilePath));
String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes())); String remoteChecksum = stripControlCharacters(new String(checksum.getInputStream().readAllBytes()));
if (localChecksum.equals(remoteChecksum)) { if (localChecksum.equals(remoteChecksum)) {
logger.info(() -> "Checksums match, returning cached file " + cachedFilePath);
return cachedFilePath; return cachedFilePath;
} }
else { else {
@ -260,12 +302,20 @@ public class ResolverForNBIOCache implements ContentResolver {
} }
private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException { private static String generateSHA256Checksum(String filePath) throws IOException, NoSuchAlgorithmException {
logger.info(() -> "Generating sha256 checksum for " + filePath);
long fileSize = Files.size(Path.of(filePath));
long totalBytesRead = 0;
Timer timer = new Timer();
ProgressPrinter printer = new ProgressPrinter(fileSize, 0);
timer.scheduleAtFixedRate(printer, 2000, 2000);
MessageDigest md = MessageDigest.getInstance("SHA-256"); MessageDigest md = MessageDigest.getInstance("SHA-256");
try (InputStream is = new FileInputStream(filePath)) { try (InputStream is = new FileInputStream(filePath)) {
byte[] buffer = new byte[8192]; byte[] buffer = new byte[8192];
int bytesRead; int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) { while ((bytesRead = is.read(buffer)) != -1) {
md.update(buffer, 0, bytesRead); md.update(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
printer.update(totalBytesRead);
} }
} }
byte[] digest = md.digest(); byte[] digest = md.digest();