Skip to content

Commit

Permalink
Merge pull request #10 from mytang0/config
Browse files Browse the repository at this point in the history
Configuration management
  • Loading branch information
mytang0 committed Apr 24, 2024
2 parents 5b34328 + 54046dd commit 423d928
Show file tree
Hide file tree
Showing 28 changed files with 787 additions and 124 deletions.
5 changes: 5 additions & 0 deletions brook-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
package xyz.mytang0.brook.common.extension;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xyz.mytang0.brook.common.annotation.OrderComparator;
import xyz.mytang0.brook.common.extension.injector.ExtensionInjector;
import xyz.mytang0.brook.common.extension.loading.LoadingStrategy;
import xyz.mytang0.brook.common.utils.Holder;
import xyz.mytang0.brook.common.utils.ReflectUtils;
import xyz.mytang0.brook.common.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
Expand Down Expand Up @@ -229,15 +227,15 @@ public List<T> getExtensionInstances() {
return (List<T>) this.cachedInstances.values()
.stream()
.map(Holder::get)
.sorted(OrderComparator.INSTANCE)
.collect(Collectors.toList());
}
// Dynamic Loading
List<T> instances = new ArrayList<>();
extensionClasses.keySet().forEach(name ->
Optional.ofNullable(this.getExtension(name))
.ifPresent(instances::add)
);
return instances;
return extensionClasses.keySet()
.stream()
.map(this::getExtension)
.sorted(OrderComparator.INSTANCE)
.collect(Collectors.toList());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Map;
Expand Down Expand Up @@ -179,17 +180,38 @@ public static Object getValueByName(Object obj, String fieldName)

public static Field getFieldByName(Object obj, String fieldName) {
Class<?> superClass = obj.getClass();
return getFieldByName(superClass, fieldName);
}

while (superClass != Object.class) {
public static Field getFieldByName(Class<?> clazz, String fieldName) {
while (clazz != Object.class) {
try {
return superClass.getDeclaredField(fieldName);
return clazz.getDeclaredField(fieldName);
} catch (NoSuchFieldException var4) {
superClass = superClass.getSuperclass();
clazz = clazz.getSuperclass();
}
}
return null;
}

public static void setFieldValue(Method method, Object instance, Object value) {
boolean isInaccessible = !method.isAccessible();
if (isInaccessible) {
method.setAccessible(true);
}
try {
method.invoke(instance, value);
} catch (IllegalAccessException
| IllegalArgumentException
| InvocationTargetException e) {
throw new RuntimeException(method.getName(), e);
} finally {
if (isInaccessible) {
method.setAccessible(false);
}
}
}

public static Map<String, Object> objectToMap(Object fromValue) {
return JsonUtils.convertValue(fromValue, new TypeReference<Map<String, Object>>() {
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -22,6 +23,7 @@ public abstract class JsonUtils {
private static final ObjectMapper DEFAULT = new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.configure(FAIL_ON_UNRESOLVED_OBJECT_IDS, false)
.registerModule(new JavaTimeModule())
.setSerializationInclusion(JsonInclude.Include.NON_NULL);

private static volatile ObjectMapper objectMapper = DEFAULT;
Expand All @@ -30,6 +32,10 @@ public static void setObjectMapper(ObjectMapper objectMapper) {
JsonUtils.objectMapper = Optional.ofNullable(objectMapper).orElse(DEFAULT);
}

public static ObjectMapper getObjectMapper() {
return objectMapper;
}

public static String toJsonString(Object object) {
Objects.requireNonNull(objectMapper);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,21 @@ private static void getAllInterfaces(Class<?> cls,
cls = cls.getSuperclass();
}
}

public static List<Method> getSetterMethods(Class<?> cls) {
List<Method> setters = new ArrayList<>();
Method[] methods = cls.getMethods();
for (Method method : methods) {
if (isSetter(method)) {
setters.add(method);
}
}
return setters;
}

private static boolean isSetter(Method method) {
return method.getName().startsWith("set")
&& method.getParameterCount() == 1
&& method.getReturnType().equals(void.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,11 @@ public static boolean isPrimitives(Class<?> cls) {
public static boolean isPrimitive(Class<?> cls) {
return ClassUtils.isPrimitiveOrWrapper(cls);
}

public static boolean isJdkClass(Class<?> cls) {
String packageName = cls.getPackage().getName();
return packageName.startsWith("java.") ||
packageName.startsWith("javax.") ||
packageName.startsWith("com.sun.");
}
}
47 changes: 22 additions & 25 deletions brook-core/src/main/java/xyz/mytang0/brook/core/FlowExecutor.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package xyz.mytang0.brook.core;

import com.google.common.base.Joiner;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import xyz.mytang0.brook.common.configuration.Configuration;
import xyz.mytang0.brook.common.constants.Delimiter;
import xyz.mytang0.brook.common.context.FlowContext;
Expand All @@ -26,6 +32,7 @@
import xyz.mytang0.brook.core.exception.TerminateException;
import xyz.mytang0.brook.core.execution.ExecutionProperties;
import xyz.mytang0.brook.core.lock.FlowLockFacade;
import xyz.mytang0.brook.core.lock.LockProperties;
import xyz.mytang0.brook.core.metadata.MetadataFacade;
import xyz.mytang0.brook.core.metadata.MetadataProperties;
import xyz.mytang0.brook.core.monitor.DelayedTaskMonitor;
Expand All @@ -34,17 +41,12 @@
import xyz.mytang0.brook.spi.cache.FlowCache;
import xyz.mytang0.brook.spi.cache.FlowCacheFactory;
import xyz.mytang0.brook.spi.computing.EngineActuator;
import xyz.mytang0.brook.spi.config.Configurator;
import xyz.mytang0.brook.spi.execution.ExecutionDAO;
import xyz.mytang0.brook.spi.executor.ExecutorFactory;
import xyz.mytang0.brook.spi.metadata.MetadataService;
import xyz.mytang0.brook.spi.queue.QueueService;
import xyz.mytang0.brook.spi.task.FlowTask;
import com.google.common.base.Joiner;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;

import javax.validation.ValidationException;
import java.util.ArrayList;
Expand All @@ -64,6 +66,7 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import static xyz.mytang0.brook.core.constants.FlowConstants.DEFAULT_ENGINE_TYPE;
import static xyz.mytang0.brook.core.constants.FlowConstants.DEFAULT_TIMEOUT_MS;
import static xyz.mytang0.brook.core.constants.FlowConstants.LOCK_TRY_TIME_MS;
Expand All @@ -72,7 +75,6 @@
import static xyz.mytang0.brook.core.exception.FlowErrorCode.FLOW_EXECUTION_ERROR;
import static xyz.mytang0.brook.core.exception.FlowErrorCode.FLOW_NOT_EXIST;
import static xyz.mytang0.brook.core.exception.FlowErrorCode.TASK_NOT_EXIST;
import static xyz.mytang0.brook.core.executor.ExecutorEnum.ASYNC_EXECUTOR;
import static xyz.mytang0.brook.core.executor.ExecutorEnum.FLOW_STARTER;
import static xyz.mytang0.brook.core.utils.ParameterUtils.flowContext;
import static xyz.mytang0.brook.core.utils.ParameterUtils.getFlowInput;
Expand All @@ -81,7 +83,6 @@
import static xyz.mytang0.brook.core.utils.ParameterUtils.getTaskInput;
import static xyz.mytang0.brook.core.utils.ParameterUtils.getTaskOutput;
import static xyz.mytang0.brook.core.utils.QueueUtils.getTaskDelayQueueName;
import static java.util.Objects.requireNonNull;

@Slf4j
public class FlowExecutor<T extends FlowTask> {
Expand All @@ -96,8 +97,6 @@ public class FlowExecutor<T extends FlowTask> {

private final ExecutorService flowStarter;

private final ExecutorService asyncExecutor;

private final FlowCacheFactory flowCacheFactory;

private final FlowAspect flowAspect;
Expand All @@ -111,17 +110,19 @@ public class FlowExecutor<T extends FlowTask> {
private final DelayedTaskMonitorProperties delayedTaskMonitorProperties;


public FlowExecutor(FlowLockFacade flowLockFacade,
FlowTaskRegistry<T> flowTaskRegistry,
QueueProperties queueProperties,
MetadataProperties metadataProperties,
ExecutionProperties executionProperties,
DelayedTaskMonitorProperties delayedTaskMonitorProperties) {
this.flowLockFacade = flowLockFacade;
public FlowExecutor(FlowTaskRegistry<T> flowTaskRegistry) {
Configurator configurator = ExtensionDirector
.getExtensionLoader(Configurator.class)
.getDefaultExtension();
this.flowLockFacade = new FlowLockFacade(
configurator.getConfig(LockProperties.class)
);
this.flowTaskRegistry = flowTaskRegistry;
this.flowAspect = new FlowAspect();
this.taskAspect = new TaskAspect();
this.metadataService = new MetadataFacade(metadataProperties);
this.metadataService = new MetadataFacade(
configurator.getConfig(MetadataProperties.class)
);
this.engineActuator = ExtensionDirector
.getExtensionLoader(EngineActuator.class)
.getDefaultExtension();
Expand All @@ -132,13 +133,9 @@ public FlowExecutor(FlowLockFacade flowLockFacade,
.getExtensionLoader(ExecutorFactory.class)
.getDefaultExtension()
.getExecutor(FLOW_STARTER);
this.asyncExecutor = ExtensionDirector
.getExtensionLoader(ExecutorFactory.class)
.getDefaultExtension()
.getExecutor(ASYNC_EXECUTOR);
this.queueProperties = queueProperties;
this.executionProperties = executionProperties;
this.delayedTaskMonitorProperties = delayedTaskMonitorProperties;
this.queueProperties = configurator.getConfig(QueueProperties.class);
this.executionProperties = configurator.getConfig(ExecutionProperties.class);
this.delayedTaskMonitorProperties = configurator.getConfig(DelayedTaskMonitorProperties.class);
DelayedTaskMonitor.init(this, flowLockFacade, delayedTaskMonitorProperties);
}

Expand Down
Loading

0 comments on commit 423d928

Please sign in to comment.