diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCache.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCache.java new file mode 100644 index 000000000..93a70efa6 --- /dev/null +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCache.java @@ -0,0 +1,137 @@ +package io.nosqlbench.adapters.api.activityimpl.uniform; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import io.nosqlbench.nb.api.errors.OpConfigError; +import org.jetbrains.annotations.NotNull; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.LongFunction; + +public class ConcurrentIndexCache implements Iterable { + private final AtomicReference> cacheRef; + private static final int GROWTH_FACTOR = 2; + private final LongFunction valueLoader; + private final BitSet active = new BitSet(); + + // Constructor with initial capacity + public ConcurrentIndexCache(LongFunction valueLoader) { + this.valueLoader = valueLoader; + this.cacheRef = new AtomicReference<>(new AtomicReferenceArray<>(1)); + } + + // Get or compute value if absent, using a valueLoader function + public T get(long longkey) { + if (longkey > Integer.MAX_VALUE) { + throw new OpConfigError("space index must be between 0 and " + (Integer.MAX_VALUE - 1) + " inclusive"); + } + int key = (int)longkey; + + AtomicReferenceArray currentCache = cacheRef.get(); + + if (key >= currentCache.length()) { + resize(key); + currentCache = cacheRef.get(); // Get the updated array after resizing + } + + T value = currentCache.get(key); + if (value == null) { + T newValue; + synchronized (valueLoader) { // limit construction concurrency to 1 for now to avoid wasteful races + newValue = valueLoader.apply(key); + } + // Atomically set the value if it's still null (compare-and-set) + if (currentCache.compareAndSet(key, null, newValue)) { + active.set(key); + return newValue; + } else { + // Another thread might have set the value, so return the existing one + return currentCache.get(key); + } + } + return value; + } + + // Method to resize the array if key exceeds current capacity + private synchronized void resize(int key) { + AtomicReferenceArray currentCache = cacheRef.get(); + if (key < currentCache.length()) { + return; // Double-check locking to avoid multiple resizes + } + + // Calculate new size (at least as large as key + 1) + int newCapacity = Math.max(currentCache.length() * GROWTH_FACTOR, key + 1); + AtomicReferenceArray newCache = new AtomicReferenceArray<>(newCapacity); + + // Copy elements from old cache to new cache + for (int i = 0; i < currentCache.length(); i++) { + newCache.set(i, currentCache.get(i)); + } + + // Atomically update the cache reference + cacheRef.set(newCache); + } + + // Optional: Method to remove an entry + public T remove(int key) { + AtomicReferenceArray currentCache = cacheRef.get(); + if (key >= currentCache.length()) { + return null; // Key is out of bounds + } + + T oldValue = currentCache.get(key); + currentCache.set(key, null); // Set the slot to null (safe for garbage collection) + active.clear(key); + return oldValue; + } + + // Optional: Method to clear the entire cache + public void clear() { + cacheRef.set(new AtomicReferenceArray<>(1)); + } + + @Override + public @NotNull Iterator iterator() { + return new ElementIterator<>(this); + } + + public static final class ElementIterator implements @NotNull Iterator { + + private final PrimitiveIterator.OfInt iterator; + private final ConcurrentIndexCache indexCache; + + public ElementIterator(ConcurrentIndexCache ts) { + this.indexCache = ts; + iterator = ts.active.stream().iterator(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public T next() { + int index = this.iterator.nextInt(); + return indexCache.get(index); + } + } +} diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentSpaceCache.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentSpaceCache.java new file mode 100644 index 000000000..de3582a1a --- /dev/null +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentSpaceCache.java @@ -0,0 +1,28 @@ +package io.nosqlbench.adapters.api.activityimpl.uniform; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.function.LongFunction; + +public class ConcurrentSpaceCache extends ConcurrentIndexCache { + public ConcurrentSpaceCache(LongFunction valueLoader) { + super(valueLoader); + } + +} diff --git a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java index f35f0d81b..a05adeb14 100644 --- a/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java +++ b/nb-apis/adapters-api/src/main/java/io/nosqlbench/adapters/api/activityimpl/uniform/DriverAdapter.java @@ -146,7 +146,7 @@ public interface DriverAdapter extends NBComponent * * @return A cache of named objects */ - StringDriverSpaceCache getSpaceCache(); + ConcurrentSpaceCache getSpaceCache(); /** * This method allows each driver adapter to create named state which is automatically diff --git a/nb-apis/adapters-api/src/test/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCacheTest.java b/nb-apis/adapters-api/src/test/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCacheTest.java new file mode 100644 index 000000000..4d313d6d8 --- /dev/null +++ b/nb-apis/adapters-api/src/test/java/io/nosqlbench/adapters/api/activityimpl/uniform/ConcurrentIndexCacheTest.java @@ -0,0 +1,34 @@ +package io.nosqlbench.adapters.api.activityimpl.uniform; + +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +class ConcurrentIndexCacheTest { + @Test + public void testBasicCache() { + ConcurrentIndexCache sc = new ConcurrentIndexCache<>(l -> String.valueOf(l)); + String s = sc.get(300); + assertThat(s).isEqualTo("300"); + } + +}