concurrent index cache improvements

track size of concurrent indexes
This commit is contained in:
Jonathan Shook 2024-10-30 13:10:02 -05:00
parent 6ba4ff16fe
commit 5512830f31
3 changed files with 106 additions and 15 deletions

View File

@ -18,7 +18,13 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
*/
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.core.NBNamedElement;
import io.nosqlbench.nb.api.errors.OpConfigError;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import java.util.*;
@ -26,24 +32,49 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.LongFunction;
/**
* <P>This cache implementation packs referents into an atomic array, keeping things as compact as possible,
* allowing auto-resizing, size tracking, and supporting concurrent access with minimal locking. It also uses a bitset
* to track the
* referent indices for enumeration or traversal.</P>
*
* <P>TODO: The referent indices are intended to be drawn from a contiguous set of integer identifiers. If a referent
* index which is extremely large is accessed, this will cause the referent array to be resized, possibly
* causing OOM. Because of this, some sampling methods will likely be applied to this layer to pre-verify
* the likely bounds of provided indices prior to actually using them.</P>
*
* @param <T>
*/
public class ConcurrentIndexCache<T> implements Iterable<T> {
private final static Logger logger = LogManager.getLogger("SPACECACHE");
private final AtomicReference<AtomicReferenceArray<T>> cacheRef;
private static final int GROWTH_FACTOR = 2;
private final LongFunction<T> valueLoader;
private final BitSet active = new BitSet();
private final String label;
private volatile int count = 0;
// Constructor with initial capacity
public ConcurrentIndexCache(LongFunction<T> valueLoader) {
public ConcurrentIndexCache(String label, LongFunction<T> valueLoader) {
this.label = label;
this.valueLoader = valueLoader;
this.cacheRef = new AtomicReference<>(new AtomicReferenceArray<>(1));
}
// Get or compute value if absent, using a valueLoader function
public ConcurrentIndexCache(String label) {
this(label, null);
}
public T get(long longkey) {
return get(longkey, valueLoader);
}
public T get(long longkey, LongFunction<T> defaultValueLoader) {
if (longkey > Integer.MAX_VALUE) {
throw new OpConfigError("space index must be between 0 and " + (Integer.MAX_VALUE - 1) + " inclusive");
}
int key = (int)longkey;
int key = (int) longkey;
AtomicReferenceArray<T> currentCache = cacheRef.get();
@ -55,18 +86,22 @@ public class ConcurrentIndexCache<T> implements Iterable<T> {
T value = currentCache.get(key);
if (value == null) {
T newValue;
synchronized (valueLoader) { // limit construction concurrency to 1 for now to avoid wasteful races
newValue = valueLoader.apply(key);
synchronized (defaultValueLoader) { // limit construction concurrency to 1 for now to avoid wasteful races
newValue = defaultValueLoader.apply(key);
}
// Atomically set the value if it's still null (compare-and-set)
if (currentCache.compareAndSet(key, null, newValue)) {
active.set(key);
count++;
logger.debug(() -> "initializing index[ " + key + " ] for [ " + label + " ] cache");
return newValue;
} else {
// Another thread might have set the value, so return the existing one
return currentCache.get(key);
}
}
logger.debug(() -> "returning index[ " + key + " ] for [ " + label + " ] cache");
return value;
}
@ -100,12 +135,14 @@ public class ConcurrentIndexCache<T> implements Iterable<T> {
T oldValue = currentCache.get(key);
currentCache.set(key, null); // Set the slot to null (safe for garbage collection)
active.clear(key);
count--;
return oldValue;
}
// Optional: Method to clear the entire cache
public void clear() {
cacheRef.set(new AtomicReferenceArray<>(1));
count=0;
}
@Override
@ -113,6 +150,10 @@ public class ConcurrentIndexCache<T> implements Iterable<T> {
return new ElementIterator<>(this);
}
public int size() {
return this.count;
}
public static final class ElementIterator<T> implements @NotNull Iterator<T> {
private final PrimitiveIterator.OfInt iterator;

View File

@ -2,13 +2,13 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
/*
* Copyright (c) 2022 nosqlbench
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -18,11 +18,61 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
*/
import io.nosqlbench.nb.api.components.core.NBBaseComponent;
import io.nosqlbench.nb.api.engine.metrics.instruments.MetricCategory;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
import java.util.function.LongFunction;
public class ConcurrentSpaceCache<S extends Space> extends ConcurrentIndexCache<S> {
public ConcurrentSpaceCache(LongFunction<S> valueLoader) {
super(valueLoader);
/**
* <P>Native driver state in NoSQLBench is stored in a context called a {@link Space}, with each driver adapter
* providing its own implementation. The easiest way to create one of these is to derive from {@link BaseSpace}
* The generic type parameters for the DriverAdapter API propogate through all the related interfaces so that
* op mappers, dispensers, and spaces are familiar to each other. This makes it easy, for example, to use
* support functions from a special adapter implementation in a type-safe and convenient way.
* </P>
*
* <p>A Driver Space Cache is simply a place to hold something like a client instance and all associated objects for
* quick and easy access. Each space cache is simply a named and separate cache of objects. This is provided as a
* convenient way to keep object state around which may be needed during the course of executing operations with a
* driver or API. By naming each space, it becomes possible for tests to create and use separate logical instances of a
* client API for advanced testing. The default instance should simply be named {@code default}</p>
*
* <p>Most native drivers use some combination of fluent, functional, and declarative patterns. These usually require
* you to keep access to a set of core state-holding objects in order to construct new elements to drive operations
* with. An example of this would be creating an executable operation from a session object. It is necessary to keep the
* session around in for when you create new statements. Maintaining the session object is considered an essential part
* of idiomatic and efficient use of the API. Further, you may have builders or factories that are created from the
* session which should be cached as well. Keeping all these objects together requires attaching them to a cohesive
* owning object -- That is the space cache.</p>
*
* @param <S>
* The type which will represent the cache for a given type of adapter.
*/
public class ConcurrentSpaceCache<S extends Space> extends NBBaseComponent implements Iterable<S> {
private final ConcurrentIndexCache<S> cache;
public ConcurrentSpaceCache(DriverAdapter<?, S> adapter, LongFunction<S> valueLoader) {
super(adapter, NBLabels.forKV("spacesof", adapter.getAdapterName()));
this.cache = new ConcurrentIndexCache<>("spacesof_" + adapter.getAdapterName(), valueLoader);
create().gauge(
"spaces",
() -> (double) cache.size(),
MetricCategory.Internals,
"The number of active spaces for this adapter"
);
}
public S get(long l) {
return cache.get(l);
}
@Override
public @NotNull Iterator<S> iterator() {
return cache.iterator();
}
}

View File

@ -2,13 +2,13 @@ package io.nosqlbench.adapters.api.activityimpl.uniform;
/*
* Copyright (c) 2022 nosqlbench
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@ -26,7 +26,7 @@ import static org.junit.jupiter.api.Assertions.*;
class ConcurrentIndexCacheTest {
@Test
public void testBasicCache() {
ConcurrentIndexCache<String> sc = new ConcurrentIndexCache<>(l -> String.valueOf(l));
ConcurrentIndexCache<String> sc = new ConcurrentIndexCache<>("testing",l -> String.valueOf(l));
String s = sc.get(300);
assertThat(s).isEqualTo("300");
}