Skip to content

Commit

Permalink
Add possibility to manual db-cache invalidation
Browse files Browse the repository at this point in the history
Related to JanusGraph#3155

Signed-off-by: Oleksandr Porunov <[email protected]>
  • Loading branch information
porunov committed Aug 31, 2022
1 parent 065a48f commit 8d4cdbf
Show file tree
Hide file tree
Showing 9 changed files with 688 additions and 19 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.tinkerpop.gremlin.util.Gremlin;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.graphdb.configuration.JanusGraphConstants;
import org.janusgraph.graphdb.database.cache.CacheInvalidationService;

/**
* JanusGraph graph database implementation of the Blueprint's interface.
Expand Down Expand Up @@ -157,6 +158,8 @@ public interface JanusGraph extends Transaction {
@Override
void close() throws JanusGraphException;

CacheInvalidationService getDBCacheInvalidationService();

/**
* The version of this JanusGraph graph database
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -708,4 +708,12 @@ static ExecutorService buildExecutorService(Configuration configuration){
}
return executorService;
}

public KCVSCache getEdgeStoreCache(){
return edgeStore;
}

public KCVSCache getIndexStoreCache(){
return indexStore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,16 @@ public Map<StaticBuffer,EntryList> getSlice(final List<StaticBuffer> keys, final

@Override
public void clearCache() {
// We should not call `expiredKeys.clear();` directly because there could be a race condition
// where already invalidated cache but then added new entries into it and made some mutation before `expiredKeys.clear();`
// is finished which may result in getSlice to return previously cached result and not a new mutated result.
// Thus, we are clearing expired entries first, and then we are safe to invalidate the rest of non-expired entries.
// Moreover, we shouldn't create `penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD);` because the cleaning thread
// may await on the previous `penaltyCountdown` which may result in that thread never wake up to proceed with
// probabilistic cleaning. Thus, only that cleaning thread have to have a right to reinitialize `penaltyCountdown`.
forceClearExpiredCache();
// It's always safe to invalidate full cache
cache.invalidateAll();
expiredKeys.clear();
penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD);
}

@Override
Expand All @@ -151,6 +158,31 @@ public void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries) {
if (Math.random()<1.0/INVALIDATE_KEY_FRACTION_PENALTY) penaltyCountdown.countDown();
}

@Override
public void forceClearExpiredCache() {
clearExpiredCache(false);
}

private synchronized void clearExpiredCache(boolean withNewPenaltyCountdown) {
//Do clean up work by invalidating all entries for expired keys
final Map<StaticBuffer,Long> expiredKeysCopy = new HashMap<>(expiredKeys.size());
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeys.entrySet()) {
if (isBeyondExpirationTime(expKey.getValue()))
expiredKeys.remove(expKey.getKey(), expKey.getValue());
else if (getAge(expKey.getValue())>= invalidationGracePeriodMS)
expiredKeysCopy.put(expKey.getKey(),expKey.getValue());
}
for (KeySliceQuery ksq : cache.asMap().keySet()) {
if (expiredKeysCopy.containsKey(ksq.getKey())) cache.invalidate(ksq);
}
if(withNewPenaltyCountdown){
penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD);
}
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeysCopy.entrySet()) {
expiredKeys.remove(expKey.getKey(),expKey.getValue());
}
}

@Override
public void close() throws BackendException {
cleanupThread.stopThread();
Expand Down Expand Up @@ -203,21 +235,7 @@ public void run() {
if (stop) return;
else throw new RuntimeException("Cleanup thread got interrupted",e);
}
//Do clean up work by invalidating all entries for expired keys
final Map<StaticBuffer,Long> expiredKeysCopy = new HashMap<>(expiredKeys.size());
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeys.entrySet()) {
if (isBeyondExpirationTime(expKey.getValue()))
expiredKeys.remove(expKey.getKey(), expKey.getValue());
else if (getAge(expKey.getValue())>= invalidationGracePeriodMS)
expiredKeysCopy.put(expKey.getKey(),expKey.getValue());
}
for (KeySliceQuery ksq : cache.asMap().keySet()) {
if (expiredKeysCopy.containsKey(ksq.getKey())) cache.invalidate(ksq);
}
penaltyCountdown = new CountDownLatch(PENALTY_THRESHOLD);
for (Map.Entry<StaticBuffer,Long> expKey : expiredKeysCopy.entrySet()) {
expiredKeys.remove(expKey.getKey(),expKey.getValue());
}
clearExpiredCache(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ protected void incActionBy(int by, CacheMetricsAction action, StoreTransaction t

public abstract void clearCache();

protected abstract void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries);
public abstract void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries);

public abstract void forceClearExpiredCache();

@Override
public void mutate(StaticBuffer key, List<Entry> additions, List<StaticBuffer> deletions, StoreTransaction txh) throws BackendException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ public void clearCache() {
}

@Override
protected void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries) {
public void invalidate(StaticBuffer key, List<CachableStaticBuffer> entries) {
}

@Override
public void forceClearExpiredCache() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.janusgraph.diskstorage.util.StaticArrayEntry;
import org.janusgraph.diskstorage.util.time.TimestampProvider;
import org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration;
import org.janusgraph.graphdb.database.cache.CacheInvalidationService;
import org.janusgraph.graphdb.database.cache.KCVSCacheInvalidationService;
import org.janusgraph.graphdb.database.cache.SchemaCache;
import org.janusgraph.graphdb.database.idassigner.VertexIDAssigner;
import org.janusgraph.graphdb.database.idhandling.IDHandler;
Expand Down Expand Up @@ -160,6 +162,8 @@ public class StandardJanusGraph extends JanusGraphBlueprintsGraph {
private final IDManager idManager;
private final VertexIDAssigner idAssigner;
private final TimestampProvider times;
private final CacheInvalidationService cacheInvalidationService;


//Serializers
protected final IndexSerializer indexSerializer;
Expand Down Expand Up @@ -200,6 +204,9 @@ public StandardJanusGraph(GraphDatabaseConfiguration configuration) {
this.idAssigner = config.getIDAssigner(backend);
this.idManager = idAssigner.getIDManager();

this.cacheInvalidationService = new KCVSCacheInvalidationService(
backend.getEdgeStoreCache(), backend.getIndexStoreCache(), idManager);

this.serializer = config.getSerializer();
StoreFeatures storeFeatures = backend.getStoreFeatures();
this.indexSerializer = new IndexSerializer(configuration.getConfiguration(), this.serializer,
Expand Down Expand Up @@ -298,6 +305,11 @@ public synchronized void close() throws JanusGraphException {
}
}

@Override
public CacheInvalidationService getDBCacheInvalidationService() {
return cacheInvalidationService;
}

private synchronized void closeInternal() {

if (!isOpen) return;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2022 JanusGraph Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package org.janusgraph.graphdb.database.cache;

import org.janusgraph.core.log.Change;
import org.janusgraph.diskstorage.StaticBuffer;

/**
* Cache invalidation service for manual JanusGraph database-level cache invalidation.
* Use with great care because improper invalidation may result in stale data left in db-cache.
* This service wraps two different caches called as `edgeStore` and `indexStore` which form a
* single database-level cache (can be enabled via `cache.db-cache` configuration property).
* When db-cache is disabled this service doesn't make any effective changes because the cache is
* fully disabled in that case.
* <p>
* This class provides method for data invalidation for both `edgeStore` cache and `indexStore` cache
* but invalidating entries in one cache doesn't invalidate entries in another cache.
* Thus, for proper invalidation you need to invalidate necessary keys for both `edgeStore` and `indexStore`.
* <p>
* EdgeStore accepts keys where a key is an encoded vertex id. It's usually easy to invalidate EdgeStore because
* it doesn't require you to know more information to form a key except knowing a vertex id.
* <p>
* IndexStore accepts keys where a key is an encoded IndexUpdate key. To form IndexUpdate key you need to know
* the next information: vertex id, updated property name, previous property value and / or new property value.
* Thus forming IndexUpdate key to invalidate IndexStore may be more complicated.
* <p>
* EdgeStore caches properties and edges for vertices.
* IndexStore caches results for queries which use indices.
* <p>
* See JavaDoc on methods to learn how to properly form a `key` to invalidate cache in `edgeStore` or `indexStore`.
*/
public interface CacheInvalidationService {

/**
* Marks specific vertex as expired in `edgeStore` cache.
* It will make sure that any retrieved properties and edges associated with this vertex will be invalidated in vertex cache.
* <p>
* Warning! This doesn't invalidate `indexStore` cache. Thus, any queries which are using indices may still return
* stale data. See {@link #markKeyAsExpiredInIndexStore(StaticBuffer)} JavaDoc to learn how to invalidate data for
* `indexStore`.
*
* @param vertexId vertex id to expire in `edgeStore` cache
*/
void markVertexAsExpiredInEdgeStore(Long vertexId);

/**
* Marks specific key as expired in `edgeStore` cache.
* It will make sure that any retrieved properties and edges associated with this key will be invalidated in vertex cache.
* <p>
* Warning! This doesn't invalidate `indexStore` cache. Thus, any queries which are using indices may still return
* stale data. See {@link #markKeyAsExpiredInIndexStore(StaticBuffer)} JavaDoc to learn how to invalidate data for
* `indexStore`.
* <p>
* {@link org.janusgraph.graphdb.idmanagement.IDManager#getKey(long)} can be used to form a `key` from vertex id.
* Alternatively, a method {@link #markVertexAsExpiredInEdgeStore(Long)} can be used which converts vertex id into
* the `key` before passing the key to this method.
* <p>
* In case vertices invalidation is needed by processing transaction logs via {@link org.janusgraph.core.log.ChangeState}
* then the method {@link org.janusgraph.core.log.ChangeState#getVertices(Change)} can be used to retrieve all
* changed vertices and passing their ids to {@link #markVertexAsExpiredInEdgeStore(Long)}.
*
* @param key key to expire in `edgeStore` cache
*/
void markKeyAsExpiredInEdgeStore(StaticBuffer key);

/**
* Marks specific key as expired in `indexStore` cache.
* It will make sure that any retrieved data associated with this key will be invalidated in index cache.
* <p>
* Warning! This doesn't invalidate `edgeStore` cache. Thus, trying to return properties or edges for the vertex
* may still return stale data. See {@link #markKeyAsExpiredInEdgeStore(StaticBuffer)} JavaDoc to learn how to invalidate
* data for `edgeStore`.
* <p>
* To form a `key` for invalidation it is needed to know vertex id, updated property name, previous value (if there was any),
* new value (if there is any) from which IndexUpdate key can be formed.
* <p>
* Usually this information can be found in a retrieved mutation logs which are passed via {@link org.janusgraph.core.log.ChangeState}
* (described in `Transaction Log` documentation of JanusGraph). In case `indexStore` invalidation should triggered
* from processing transaction logs via `ChangeState` then it can be done using an example like below:
*
* <pre>
* IndexSerializer indexSerializer = (StandardJanusGraph) graph.getIndexSerializer();
* CacheInvalidationService invalidationService = graph.getDBCacheInvalidationService();
* changeState.getRelations(Change.ANY).forEach(janusGraphRelation -> {
* if(janusGraphRelation.isProperty() && janusGraphRelation instanceof InternalRelation){
* JanusGraphVertexProperty property = (JanusGraphVertexProperty) janusGraphRelation;
* if(property.element() instanceof InternalVertex){
* Collection&lt;IndexUpdate&gt; indexUpdates = indexSerializer.getIndexUpdates((InternalVertex) property.element(),
* Collections.singleton((InternalRelation) property));
*
* for(IndexUpdate indexUpdate : indexUpdates){
* StaticBuffer keyToInvalidate = (StaticBuffer) indexUpdate.getKey();
* invalidationService.markKeyAsExpiredInIndexStore(keyToInvalidate);
* }
*
* invalidationService.forceClearExpiredKeysInIndexStoreCache();
* invalidationService.forceInvalidateVertexInEdgeStoreCache(property.element().longId());
* }
* }
* });
* </pre>
*
* It is also possible to trigger `indexStore` invalidation by forming vertex and a property yourself. The example
* below can be used as a reference.
*
* <pre>
* public void invalidateUpdatedVertexProperty(StandardJanusGraph graph, long vertexIdUpdated, String propertyNameUpdated, Object previousPropertyValue, Object newPropertyValue){
* JanusGraphTransaction tx = graph.newTransaction();
* JanusGraphManagement graphMgmt = graph.openManagement();
* PropertyKey propertyKey = graphMgmt.getPropertyKey(propertyNameUpdated);
* CacheVertex cacheVertex = new CacheVertex((StandardJanusGraphTx) tx, vertexIdUpdated, ElementLifeCycle.Loaded);
* StandardVertexProperty propertyPreviousVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, previousPropertyValue, ElementLifeCycle.Removed);
* StandardVertexProperty propertyNewVal = new StandardVertexProperty(propertyKey.longId(), propertyKey, cacheVertex, newPropertyValue, ElementLifeCycle.New);
* IndexSerializer indexSerializer = graph.getIndexSerializer();
*
* Collection&lt;IndexUpdate&gt; indexUpdates = indexSerializer.getIndexUpdates(cacheVertex, Arrays.asList(propertyPreviousVal, propertyNewVal));
* CacheInvalidationService invalidationService = graph.getDBCacheInvalidationService();
*
* for(IndexUpdate indexUpdate : indexUpdates){
* StaticBuffer keyToInvalidate = (StaticBuffer) indexUpdate.getKey();
* invalidationService.markKeyAsExpiredInIndexStore(keyToInvalidate);
* }
*
* invalidationService.forceClearExpiredKeysInIndexStoreCache();
* invalidationService.forceInvalidateVertexInEdgeStoreCache(vertexIdUpdated);
*
* graphMgmt.rollback();
* tx.rollback();
* }
* </pre>
*
* @param key key to expire in `indexStore` cache
*/
void markKeyAsExpiredInIndexStore(StaticBuffer key);

/**
* Instead of waiting for a probabilistic invalidation it triggers all cached queries scan and invalidation in `edgeStore`.
* This will remove any cached expired data.
*/
void forceClearExpiredKeysInEdgeStoreCache();

/**
* Instead of waiting for a probabilistic invalidation it triggers all cached queries scan and invalidation in `indexStore`.
* This will remove any cached expired data.
*/
void forceClearExpiredKeysInIndexStoreCache();

/**
* Marks a vertex as expired in `edgeStore` cache ({@link #markVertexAsExpiredInEdgeStore(Long)}) and triggers force
* clear of expired cache (i.e. {@link #forceClearExpiredKeysInEdgeStoreCache()})
*
* @param vertexId vertex id to invalidate in `edgeStore` cache
*/
void forceInvalidateVertexInEdgeStoreCache(Long vertexId);

/**
* Marks vertices as expired in `edgeStore` cache ({@link #markVertexAsExpiredInEdgeStore(Long)}) and triggers force
* clear of expired cache (i.e. {@link #forceClearExpiredKeysInEdgeStoreCache()})
*
* @param vertexIds vertex ids to invalidate in `edgeStore` cache
*/
void forceInvalidateVerticesInEdgeStoreCache(Iterable<Long> vertexIds);

/**
* Clears `edgeStore` cache fully
*/
void clearEdgeStoreCache();

/**
* Clears `indexStore` cache fully
*/
void clearIndexStoreCache();

/**
* Clears both `edgeStore` cache and `indexStore` cache fully.
* It is the same as calling {@link #clearEdgeStoreCache()} and {@link #clearIndexStoreCache()}
*/
void clearDBCache();

}

1 comment on commit 8d4cdbf

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark

Benchmark suite Current: 8d4cdbf Previous: d688d82 Ratio
org.janusgraph.MgmtOlapJobBenchmark.runRemoveIndex 114.09884923454544 ms/op 114.90347956363637 ms/op 0.99
org.janusgraph.JanusGraphSpeedBenchmark.basicAddAndDelete 18324.320372602982 ms/op 16912.67303256719 ms/op 1.08
org.janusgraph.GraphCentricQueryBenchmark.getVertices 2141.921422949411 ms/op 2244.533148796801 ms/op 0.95
org.janusgraph.MgmtOlapJobBenchmark.runReindex 429.1107533446154 ms/op 403.35688975634366 ms/op 1.06
org.janusgraph.JanusGraphSpeedBenchmark.basicCount 303.09688068754866 ms/op 369.41181090112127 ms/op 0.82
org.janusgraph.CQLMultiQueryBenchmark.getNeighborNames 59808.19720566667 ms/op 46939.17807455001 ms/op 1.27
org.janusgraph.CQLMultiQueryBenchmark.getNames 58117.66215536666 ms/op 48136.52566508333 ms/op 1.21

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.