Skip to content

Commit

Permalink
Merge pull request #18 from salesforce/ash_remove_longId
Browse files Browse the repository at this point in the history
Revert "Long metric id support"
  • Loading branch information
ashnacoder committed Oct 30, 2020
2 parents bbc4652 + 73d0d3a commit 369d29a
Show file tree
Hide file tree
Showing 42 changed files with 225 additions and 982 deletions.
2 changes: 2 additions & 0 deletions carbonj.service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ RUN yum update -y && \
sysstat \
epel-release

RUN yum install -y gcc-c++ gcc make libtool automake autoconf make python3-devel

RUN rpm --import http://repos.azulsystems.com/RPM-GPG-KEY-azulsystems && \
curl -o /etc/yum.repos.d/zulu.repo http://repos.azulsystems.com/rhel/zulu.repo && \
yum update -y && \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
Expand Down Expand Up @@ -60,10 +59,6 @@ public class CarbonjAdmin

private final NameUtils nameUtils;

@Value( "${metrics.store.longId:false}" )
private boolean longId;


private Supplier<RuntimeException> notConfigured = ( ) -> new RuntimeException(
"Time Series Store is not configured." );

Expand Down Expand Up @@ -125,7 +120,7 @@ public void listMetrics2( @PathVariable final String pattern, Writer response )
}

@RequestMapping( value = "/dumpnames", method = RequestMethod.GET )
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) long startId,
public void dumpNames( @RequestParam( value = "startId", required = false, defaultValue = "0" ) int startId,
@RequestParam( value = "startName", required = false ) String startName,
@RequestParam( value = "count", required = false ) Integer count,
@RequestParam( value = "filter", required = false ) String wildcard, Writer response )
Expand All @@ -143,7 +138,7 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
}
try
{
tsStore().scanMetrics( startId, getMaxId(), m -> {
tsStore().scanMetrics( startId, Integer.MAX_VALUE, m -> {
if ( !filter.test( m ) )
{
return;
Expand All @@ -169,10 +164,6 @@ public void dumpNames( @RequestParam( value = "startId", required = false, defau
}
}

private long getMaxId() {
return longId ? Long.MAX_VALUE : Integer.MAX_VALUE;
}

private boolean loadLock = false;

private volatile boolean abortLoad = false;
Expand Down Expand Up @@ -537,7 +528,6 @@ static class StopException

static boolean hasDataSince( TimeSeriesStore ts, String metric, int from )
{

for ( String dbName : Arrays.asList( "30m2y", "5m7d", "60s24h" ) )
{
if ( null != ts.getFirst( dbName, metric, from, Integer.MAX_VALUE ) )
Expand All @@ -564,7 +554,7 @@ public void cleanSeries( @RequestParam( value = "from", required = false, defaul

try
{
ts.scanMetrics( 0, getMaxId(), m -> {
ts.scanMetrics( 0, Integer.MAX_VALUE, m -> {
if ( written.get() >= count )
{
// produced big enough result - interrupt execution through exception (signal "donness")
Expand Down Expand Up @@ -632,7 +622,7 @@ public void dumpSeries( @PathVariable final String dbName,
try
{
ts.scanMetrics( cursor,
getMaxId(),
Integer.MAX_VALUE,
m -> {
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface TimeSeriesStore

DataPointExportResults exportPoints( String dbName, String metricName );

DataPointExportResults exportPoints( String dbName, long metricId );
DataPointExportResults exportPoints( String dbName, int metricId );

// to support testing
Metric selectRandomMetric();
Expand All @@ -47,13 +47,13 @@ public interface TimeSeriesStore

Metric getMetric( String name, boolean createIfMissing );

Metric getMetric( long metricId );
Metric getMetric( int metricId );

String getMetricName( long metricId );
String getMetricName( int metricId );

void scanMetrics( Consumer<Metric> m );

long scanMetrics( long start, long end, Consumer<Metric> m );
int scanMetrics( int start, int end, Consumer<Metric> m );

List<Metric> findMetrics( String pattern );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ public class TimeSeriesStoreImpl implements TimeSeriesStore

private volatile long logNoOfSeriesThreshold;

private boolean longId;

public static ThreadPoolExecutor newSerialTaskQueue(int queueSize) {
ThreadFactory tf =
new ThreadFactoryBuilder()
Expand Down Expand Up @@ -144,8 +142,7 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
ThreadPoolExecutor heavyQueryTaskQueue, ThreadPoolExecutor serialTaskQueue,
DataPointStore pointStore, DatabaseMetrics dbMetrics,
boolean batchedSeriesRetrieval, int batchedSeriesSize, boolean dumpIndex,
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile,
boolean longId) {
File dumpIndexFile, int maxNonLeafPointsLoggedPerMin, String metricsStoreConfigFile) {
this.nameIndex = Preconditions.checkNotNull(nameIndex);
this.eventLogger = eventLogger;
this.pointStore = Preconditions.checkNotNull(pointStore);
Expand All @@ -157,7 +154,6 @@ public TimeSeriesStoreImpl(MetricRegistry metricRegistry, MetricIndex nameIndex,
this.dumpIndex = dumpIndex;
this.dumpIndexFile = dumpIndexFile;
this.nonLeafPointsLogQuota = new Quota(maxNonLeafPointsLoggedPerMin, 60);
this.longId = longId;


rejectedCounter = metricRegistry.counter(
Expand Down Expand Up @@ -362,11 +358,11 @@ public DataPointExportResults exportPoints(String dbName, String metricName) {
}

@Override
public DataPointExportResults exportPoints(String dbName, long metricId) {
public DataPointExportResults exportPoints(String dbName, int metricId) {
return exportPoints(dbName, null, metricId);
}

private DataPointExportResults exportPoints(String dbName, String metricName, Long metricId) {
private DataPointExportResults exportPoints(String dbName, String metricName, Integer metricId) {
if (!RetentionPolicy.dbNameExists(dbName)) {
throw new RuntimeException(String.format("Unknown dbName [%s]", dbName));
}
Expand Down Expand Up @@ -645,13 +641,13 @@ public DeleteAPIResult deleteAPI( String name, boolean delete, Set<String> exclu
}

@Override
public Metric getMetric( long metricId )
public Metric getMetric( int metricId )
{
return nameIndex.getMetric( metricId );
}

@Override
public String getMetricName( long metricId )
public String getMetricName( int metricId )
{
return nameIndex.getMetricName( metricId );
}
Expand All @@ -668,18 +664,11 @@ public void deleteAll()
@Override
public void scanMetrics( Consumer<Metric> m )
{
if(longId)
{
scanMetrics( 0, Long.MAX_VALUE, m );
}
else
{
scanMetrics( 0, Integer.MAX_VALUE, m );
}
scanMetrics( 0, Integer.MAX_VALUE, m );
}

@Override
public long scanMetrics( long start, long end, Consumer<Metric> m )
public int scanMetrics( int start, int end, Consumer<Metric> m )
{
return nameIndex.scanNames( start, end, m );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
*/
package com.demandware.carbonj.service.db;

import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.codahale.metrics.MetricRegistry;
import com.demandware.carbonj.service.db.index.cfgMetricIndex;
import com.demandware.carbonj.service.db.model.DataPointStore;
import com.demandware.carbonj.service.db.model.MetricIndex;
import com.demandware.carbonj.service.db.points.cfgDataPoints;
import com.demandware.carbonj.service.db.util.DatabaseMetrics;
import com.demandware.carbonj.service.engine.cfgCentralThreadPools;
import com.demandware.carbonj.service.events.EventsLogger;
import com.demandware.carbonj.service.events.cfgCarbonjEventsLogger;
Expand All @@ -22,12 +21,15 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Import;

import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.demandware.carbonj.service.db.index.cfgMetricIndex;
import com.demandware.carbonj.service.db.model.DataPointStore;
import com.demandware.carbonj.service.db.model.MetricIndex;
import com.demandware.carbonj.service.db.points.cfgDataPoints;
import com.demandware.carbonj.service.db.util.DatabaseMetrics;

@Import( { cfgMetricIndex.class, cfgDataPoints.class, cfgCentralThreadPools.class, cfgCarbonjEventsLogger.class } )
@ConditionalOnProperty(name=cfgTimeSeriesStorage.DB_ENABLED_PROPERTY_KEY, havingValue="true", matchIfMissing=true)
Expand All @@ -37,9 +39,6 @@ public class cfgTimeSeriesStorage

public static final String DB_ENABLED_PROPERTY_KEY = "metrics.store.enabled";

@Value( "${metrics.store.longId:false}" )
private boolean longId;

@Value( "${metrics.store.fetchSeriesThreads:20}" )
private int nTaskThreads;

Expand Down Expand Up @@ -86,8 +85,7 @@ TimeSeriesStore timeSeriesStore( MetricIndex nameIndex, DataPointStore pointStor
TimeSeriesStoreImpl.newHeavyQueryTaskQueue( nHeavyQueryThreads, heavyQueryBlockingQueueSize ),
TimeSeriesStoreImpl.newSerialTaskQueue( serialQueueSize ), pointStore,
dbMetrics, batchedSeriesRetrieval,
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile,
longId);
batchedSeriesSize, dumpIndex, new File( dumpIndexFile ), maxNonLeafPointsLoggedPerMin, metricStoreConfigFile);

s.scheduleWithFixedDelay(timeSeriesStore::reload, 60, 60, TimeUnit.SECONDS );
s.scheduleWithFixedDelay(timeSeriesStore::refreshStats, 60, 10, TimeUnit.SECONDS );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@
import com.google.common.base.Preconditions;

class IdRecord
implements Record<Long>
implements Record<Integer>
{
private Long key;
private Integer key;

private String metricName;

public IdRecord( Long key, String metricName)
public IdRecord( Integer key, String metricName)
{
this.key = Preconditions.checkNotNull(key);
this.metricName = Preconditions.checkNotNull(metricName);
}

public Long key()
public Integer key()
{
return key;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,64 +6,47 @@
*/
package com.demandware.carbonj.service.db.index;

import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;

import static java.nio.charset.StandardCharsets.UTF_8;

class IdRecordSerializer
implements RecordSerializer<Long, IdRecord>
implements RecordSerializer<Integer, IdRecord>
{
private boolean longId;

public IdRecordSerializer(boolean longId)
public IdRecordSerializer()
{
this.longId = longId;
}

@Override
public Long key( byte[] keyBytes )
public Integer key( byte[] keyBytes )
{
return longId ? Longs.fromByteArray( keyBytes ) : Integer.valueOf(Ints.fromByteArray(keyBytes)).longValue();
return Ints.fromByteArray( keyBytes );
}

@Override
public IdRecord toIndexEntry( byte[] keyBytes, byte[] valueBytes)
{
Long key = key(keyBytes);
Integer key = key(keyBytes);
return toIndexEntry( key, valueBytes);
}

@Override
public IdRecord toIndexEntry( Long key, byte[] valueBytes)
public IdRecord toIndexEntry( Integer key, byte[] valueBytes)
{
ByteArrayDataInput in = ByteStreams.newDataInput( valueBytes );
if(longId)
{
// a byte for versioning
byte entryType = in.readByte();
}
return new IdRecord( key, in.readUTF() );
String value = new String(valueBytes, UTF_8);
return new IdRecord( key, value );
}

@Override
public byte[] keyBytes(Long key)
public byte[] keyBytes(Integer key)
{
return longId ? Longs.toByteArray(key) : Ints.toByteArray(key.intValue());
return Ints.toByteArray(key);
}

@Override
public byte[] valueBytes(IdRecord e)
{
ByteArrayDataOutput out = ByteStreams.newDataOutput();
if(longId)
{
// leaving a byte for versioning
out.writeByte(0);
}
out.writeUTF(e.metricName());
return out.toByteArray();
return e.metricName().getBytes( UTF_8 );
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ public interface IndexStore<K, R extends Record<K>>

K maxKey();

long scan( K startKey, K endKey, Consumer<R> c );
int scan( K startKey, K endKey, Consumer<R> c );
}
Loading

0 comments on commit 369d29a

Please sign in to comment.