Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vhvapm 444 #1653

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package rocks.inspectit.ocelot.core.metrics;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.opencensus.tags.TagContext;
import io.opencensus.tags.TagContextBuilder;
import io.opencensus.tags.TagKey;
import io.opencensus.tags.Tags;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -25,17 +21,13 @@
import rocks.inspectit.ocelot.core.instrumentation.context.InspectitContextImpl;
import rocks.inspectit.ocelot.core.instrumentation.hook.actions.IHookAction;
import rocks.inspectit.ocelot.core.instrumentation.hook.actions.model.MetricAccessor;
import rocks.inspectit.ocelot.core.metrics.tagGuards.PersistedTagsReaderWriter;
import rocks.inspectit.ocelot.core.selfmonitoring.AgentHealthManager;
import rocks.inspectit.ocelot.core.tags.CommonTagsManager;
import rocks.inspectit.ocelot.core.tags.TagUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.Future;
Expand All @@ -46,133 +38,48 @@
@Slf4j
public class MeasureTagValueGuard {
private static final String tagOverFlowMessageTemplate = "Overflow in measure %s for tag key %s";

/**
* Map of measure names and their related set of tag keys, which are currently blocked.
*/
private final Map<String, Set<String>> blockedTagKeysByMeasure = Maps.newHashMap();
PersistedTagsReaderWriter fileReaderWriter;
@Autowired
private InspectitEnvironment env;

@Autowired
private AgentHealthManager agentHealthManager;

/**
* Common tags manager needed for gathering common tags when recording metrics.
*/
@Autowired
private CommonTagsManager commonTagsManager;

@Autowired
private ScheduledExecutorService executor;

private PersistedTagsReaderWriter fileReaderWriter;

private volatile boolean isShuttingDown = false;

private boolean hasTagValueOverflow = false;

/**
* Map of measure names and their related set of tag keys, which are currently blocked.
*/
private final Map<String, Set<String>> blockedTagKeysByMeasure = Maps.newHashMap();

private Set<TagsHolder> latestTags = Collections.synchronizedSet(new HashSet<>());

private Future<?> blockTagValuesFuture;

@PostConstruct
protected void init() {
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();
if (!tagGuardSettings.isEnabled()) return;

fileReaderWriter = new PersistedTagsReaderWriter(tagGuardSettings.getDatabaseFile(), new ObjectMapper());
scheduleTagGuardJob();

log.info(String.format("TagValueGuard started with scheduleDelay %s and database file %s", tagGuardSettings.getScheduleDelay(), tagGuardSettings.getDatabaseFile()));
}

private void scheduleTagGuardJob() {
Duration tagGuardScheduleDelay = env.getCurrentConfig().getMetrics().getTagGuard().getScheduleDelay();
blockTagValuesFuture = executor.schedule(blockTagValuesTask, tagGuardScheduleDelay.toNanos(), TimeUnit.NANOSECONDS);
}


@PreDestroy
protected void stop() {
if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;
if (isTagGuardDisabled()) {
return;
}

isShuttingDown = true;
blockTagValuesFuture.cancel(true);
}

/**
* Task, which reads the persisted tag values to determine, which tags should be blocked, because of exceeding
* the specific tag value limit.
* If new tags values have been created, they will be persisted.
*/
@VisibleForTesting
Runnable blockTagValuesTask = () -> {
if (!env.getCurrentConfig().getMetrics().getTagGuard().isEnabled()) return;

// read current tag value database
Map<String, Map<String, Set<String>>> availableTagsByMeasure = fileReaderWriter.read();

Set<TagsHolder> copy = latestTags;
latestTags = Collections.synchronizedSet(new HashSet<>());

// process new tags
copy.forEach(tagsHolder -> {
String measureName = tagsHolder.getMeasureName();
Map<String, String> newTags = tagsHolder.getTags();
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());

Map<String, Set<String>> tagValuesByTagKey = availableTagsByMeasure.computeIfAbsent(measureName, k -> Maps.newHashMap());
newTags.forEach((tagKey, tagValue) -> {
Set<String> tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>());
// if tag value is new AND max values per tag is already reached
if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()).add(tagKey);
if(isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
tagValues.add(tagValue);
}
});

});

fileReaderWriter.write(availableTagsByMeasure);

// remove all blocked tags, if no values are stored in the database file
if(availableTagsByMeasure.isEmpty()) blockedTagKeysByMeasure.clear();

// independent of processing new tags, check if tags should be blocked or unblocked due to their tag value limit
availableTagsByMeasure.forEach((measureName, tags) -> {
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());
tags.forEach((tagKey, tagValues) -> {
if(tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet())
.add(tagKey);
if(isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()).remove(tagKey);
}
});
});

// invalidate incident, if tag overflow was detected, but no more tags are blocked
boolean noBlockedTagKeys = blockedTagKeysByMeasure.values().stream().allMatch(Set::isEmpty);
if(hasTagValueOverflow && noBlockedTagKeys) {
agentHealthManager.invalidateIncident(this.getClass(), "Overflow for tags resolved");
hasTagValueOverflow = false;
}

if (!isShuttingDown) scheduleTagGuardJob();
};

/**
* Gets the max value amount per tag for the given measure by hierarchically extracting
* {@link MetricDefinitionSettings#maxValuesPerTag} (prio 1),
Expand All @@ -198,7 +105,8 @@ int getMaxValuesPerTag(String measureName, InspectitConfig config) {

/**
* Creates the full tag context, including all specified tags, for the current measure
* @param context current context
*
* @param context current context
* @param metricAccessor accessor for the measure as well as the particular tags
* @return TagContext including all tags for the current measure
*/
Expand Down Expand Up @@ -249,56 +157,131 @@ public TagContext getTagContext(IHookAction.ExecutionContext context, MetricAcce

}

@Value
@EqualsAndHashCode
private static class TagsHolder {
private boolean isTagGuardDisabled() {
return !env.getCurrentConfig().getMetrics().getTagGuard().isEnabled();
}

String measureName;
/**
* Task, which reads the persisted tag values to determine, which tags should be blocked, because of exceeding
* the specific tag value limit.
* If new tags values have been created, they will be persisted.
*/
@VisibleForTesting
Runnable blockTagValuesTask = () -> {
if (isNotWritable()) {
return;
}

Map<String, String> tags;
Map<String, Map<String, Set<String>>> storedTags = fileReaderWriter.read();
processNewTags(storedTags);
fileReaderWriter.write(storedTags);
removeBlockedTags(storedTags);

// invalidate incident, if tag overflow was detected, but no more tags are blocked
boolean noBlockedTagKeys = blockedTagKeysByMeasure.values().stream().allMatch(Set::isEmpty);
if (hasTagValueOverflow && noBlockedTagKeys) {
agentHealthManager.invalidateIncident(this.getClass(), "Overflow for tags resolved");
hasTagValueOverflow = false;
}

if (!isShuttingDown) scheduleTagGuardJob();
};

private boolean isNotWritable() {
if (isTagGuardDisabled()) {
return true;
}

initTagReaderWriter();
return Objects.isNull(fileReaderWriter);
}

private void initTagReaderWriter() {
final String filename = getFilename();
if (Objects.nonNull(filename)) {
fileReaderWriter = PersistedTagsReaderWriter.of(filename);
}
}

@AllArgsConstructor
EddeCCC marked this conversation as resolved.
Show resolved Hide resolved
static class PersistedTagsReaderWriter {

@NonNull
private String fileName;

@NonNull
private ObjectMapper mapper;

public Map<String, Map<String, Set<String>>> read() {
if (!StringUtils.isBlank(fileName)) {
Path path = Paths.get(fileName);
if (Files.exists(path)) {
try {
byte[] content = Files.readAllBytes(path);
@SuppressWarnings("unchecked") Map<String, Map<String, Set<String>>> tags = mapper.readValue(content, new TypeReference<Map<String, Map<String, Set<String>>>>() {
});
return tags;
} catch (Exception e) {
log.error("Error loading tag-guard database from persistence file '{}'", fileName, e);
private String getFilename() {
TagGuardSettings tagGuardSettings = env.getCurrentConfig().getMetrics().getTagGuard();
if (!tagGuardSettings.isEnabled()) {
log.error("Filename is not set. Not able to be writing tags.");
return null;
}
Comment on lines +207 to +211
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Das wird bereits vor getFilename() in isNotWritable() geprüft. Kann raus.


final String filename = tagGuardSettings.getDatabaseFile();
if (StringUtils.isBlank(filename)) {
log.error("Filename is empty. Not able to be writing tags.");
return null;
}

log.info(String.format("TagValueGuard started with scheduleDelay %s and database file %s", tagGuardSettings.getScheduleDelay(), tagGuardSettings.getDatabaseFile()));
return filename;
}

private void processNewTags(Map<String, Map<String, Set<String>>> storedTags) {
Set<TagsHolder> copy = latestTags;
latestTags = Collections.synchronizedSet(new HashSet<>());

// process new tags
copy.forEach(tagsHolder -> {
String measureName = tagsHolder.getMeasureName();
Map<String, String> newTags = tagsHolder.getTags();
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());

Map<String, Set<String>> tagValuesByTagKey = storedTags.computeIfAbsent(measureName, k -> Maps.newHashMap());
newTags.forEach((tagKey, tagValue) -> {
Set<String> tagValues = tagValuesByTagKey.computeIfAbsent(tagKey, (x) -> new HashSet<>());
// if tag value is new AND max values per tag is already reached
if (!tagValues.contains(tagValue) && tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet()).add(tagKey);
if (isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
log.info("Could not find tag-guard database file. File will be created during next write");
tagValues.add(tagValue);
}
}
return Maps.newHashMap();
}
});

public void write(Map<String, Map<String, Set<String>>> tagValues) {
if (!StringUtils.isBlank(fileName)) {
try {
Path path = Paths.get(fileName);
Files.createDirectories(path.getParent());
String tagValuesString = mapper.writeValueAsString(tagValues);
Files.write(path, tagValuesString.getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
log.error("Error writing tag-guard database to file '{}'", fileName, e);
});
}

private void removeBlockedTags(Map<String, Map<String, Set<String>>> availableTagsByMeasure) {
// remove all blocked tags, if no values are stored in the database file
if (availableTagsByMeasure.isEmpty()) blockedTagKeysByMeasure.clear();

// independent of processing new tags, check if tags should be blocked or unblocked due to their tag value limit
availableTagsByMeasure.forEach((measureName, tags) -> {
int maxValuesPerTag = getMaxValuesPerTag(measureName, env.getCurrentConfig());
tags.forEach((tagKey, tagValues) -> {
if (tagValues.size() >= maxValuesPerTag) {
boolean isNewBlockedTag = blockedTagKeysByMeasure.computeIfAbsent(measureName, measure -> Sets.newHashSet())
.add(tagKey);
if (isNewBlockedTag) {
agentHealthManager.handleInvalidatableHealth(AgentHealth.ERROR, this.getClass(),
String.format(tagOverFlowMessageTemplate, measureName, tagKey));
hasTagValueOverflow = true;
}
} else {
blockedTagKeysByMeasure.getOrDefault(measureName, Sets.newHashSet()).remove(tagKey);
}
}
}
});
});
}

@Value
@EqualsAndHashCode
private static class TagsHolder {
String measureName;
Map<String, String> tags;
}




}


Loading
Loading