diff --git a/README.md b/README.md index ca4ad0b..e6caac8 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,7 @@ Any event that fulfills the `actor.LogEvent` interface will be logged to the def message and the attributes of the event set by the `actor.LogEvent` `log()` method. ### List of internal system events +* `actor.ActorInitializedEvent`, an actor has been initialized but did not processed its `actor.Started message` * `actor.ActorStartedEvent`, an actor has started * `actor.ActorStoppedEvent`, an actor has stopped * `actor.DeadLetterEvent`, a message was not delivered to an actor diff --git a/actor/engine_test.go b/actor/engine_test.go index 6eed2d0..8f0e327 100644 --- a/actor/engine_test.go +++ b/actor/engine_test.go @@ -36,6 +36,16 @@ func newTickReceiver(wg *sync.WaitGroup) Producer { } } +func TestRegistryGetPID(t *testing.T) { + e, _ := NewEngine(nil) + expectedPID1 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("1")) + expectedPID2 := e.SpawnFunc(func(c *Context) {}, "foo", WithID("2")) + pid := e.Registry.GetPID("foo", "1") + assert.True(t, pid.Equals(expectedPID1)) + pid = e.Registry.GetPID("foo", "2") + assert.True(t, pid.Equals(expectedPID2)) +} + func TestSendToNilPID(t *testing.T) { e, _ := NewEngine(nil) e.Send(nil, "foo") diff --git a/actor/event.go b/actor/event.go index 209ff56..3162790 100644 --- a/actor/event.go +++ b/actor/event.go @@ -26,6 +26,17 @@ func (e ActorStartedEvent) Log() (slog.Level, string, []any) { return slog.LevelInfo, "Actor started", []any{"pid", e.PID} } +// ActorInitializedEvent is broadcasted over the eventStream before an actor +// received and processed its started event. +type ActorInitializedEvent struct { + PID *PID + Timestamp time.Time +} + +func (e ActorInitializedEvent) Log() (slog.Level, string, []any) { + return slog.LevelDebug, "Actor initialized", []any{"pid", e.PID} +} + // ActorStoppedEvent is broadcasted over the eventStream each time // a process is terminated. type ActorStoppedEvent struct { diff --git a/actor/process.go b/actor/process.go index 94f5730..de08d6a 100644 --- a/actor/process.go +++ b/actor/process.go @@ -126,6 +126,7 @@ func (p *process) Start() { }() p.context.message = Initialized{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) + p.context.engine.BroadcastEvent(ActorInitializedEvent{PID: p.pid, Timestamp: time.Now()}) p.context.message = Started{} applyMiddleware(recv.Receive, p.Opts.Middleware...)(p.context) diff --git a/actor/registry.go b/actor/registry.go index b9d8e04..0bf5869 100644 --- a/actor/registry.go +++ b/actor/registry.go @@ -19,6 +19,16 @@ func newRegistry(e *Engine) *Registry { } } +// GetPID returns the process id associated for the given kind and its id. +// GetPID returns nil if the process was not found. +func (r *Registry) GetPID(kind, id string) *PID { + proc := r.getByID(kind + pidSeparator + id) + if proc != nil { + return proc.PID() + } + return nil +} + // Remove removes the given PID from the registry. func (r *Registry) Remove(pid *PID) { r.mu.Lock() diff --git a/examples/persistance/main.go b/examples/persistance/main.go index c636301..684b238 100644 --- a/examples/persistance/main.go +++ b/examples/persistance/main.go @@ -1,15 +1,16 @@ package main import ( - "context" "encoding/json" "fmt" "log" + "os" + "path" + "regexp" "sync" "time" "github.com/anthdm/hollywood/actor" - "github.com/redis/go-redis/v9" ) type Storer interface { @@ -17,7 +18,7 @@ type Storer interface { Load(key string) ([]byte, error) } -func WithPersistance(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc { +func WithPersistence(store Storer) func(actor.ReceiveFunc) actor.ReceiveFunc { return func(next actor.ReceiveFunc) actor.ReceiveFunc { return func(c *actor.Context) { switch c.Message().(type) { @@ -114,23 +115,31 @@ func (p *PlayerState) State() ([]byte, error) { return json.Marshal(state) } -type RedisStore struct { - client *redis.Client +type fileStore struct { + path string } -func newRedisStore(c *redis.Client) *RedisStore { - return &RedisStore{ - client: c, +func newFileStore() *fileStore { + // make a tmp dir: + tmpdir := "/tmp/persistenceexample" + err := os.Mkdir(tmpdir, 0755) + if err != nil && !os.IsExist(err) { + log.Fatal(err) + } + return &fileStore{ + path: tmpdir, } } -func (r *RedisStore) Store(key string, state []byte) error { - return r.client.Set(context.TODO(), key, state, 0).Err() +// Store the state in a file name key +func (r *fileStore) Store(key string, state []byte) error { + key = safeFileName(key) + return os.WriteFile(path.Join(r.path, key), state, 0755) } -func (r *RedisStore) Load(key string) ([]byte, error) { - val, err := r.client.Get(context.TODO(), key).Result() - return []byte(val), err +func (r *fileStore) Load(key string) ([]byte, error) { + key = safeFileName(key) + return os.ReadFile(path.Join(r.path, key)) } func main() { @@ -139,13 +148,12 @@ func main() { log.Fatal(err) } var ( - redisClient = redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Password: "", // no password set - DB: 0, // use default DB - }) - store = newRedisStore(redisClient) - pid = e.Spawn(newPlayerState(100, "James"), "playerState", actor.WithMiddleware(WithPersistance(store))) + store = newFileStore() + pid = e.Spawn( + newPlayerState(100, "James"), + "playerState", + actor.WithMiddleware(WithPersistence(store)), + actor.WithID("james")) ) time.Sleep(time.Second * 1) e.Send(pid, TakeDamage{Amount: 9}) @@ -154,3 +162,11 @@ func main() { e.Poison(pid, wg) wg.Wait() } + +var safeRx = regexp.MustCompile(`[^a-zA-Z0-9]`) + +// safeFileName replaces all characters azAZ09 with _ +func safeFileName(s string) string { + res := safeRx.ReplaceAllString(s, "_") + return res +} diff --git a/go.mod b/go.mod index af7e09a..4977976 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/grandcat/zeroconf v1.0.0 github.com/planetscale/vtprotobuf v0.5.0 github.com/prometheus/client_golang v1.18.0 - github.com/redis/go-redis/v9 v9.3.1 github.com/stretchr/testify v1.8.4 google.golang.org/grpc v1.60.1 google.golang.org/protobuf v1.32.0 @@ -17,7 +16,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/klauspost/cpuid/v2 v2.0.9 // indirect github.com/kr/text v0.2.0 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 0dd5613..b9a8f97 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= @@ -48,8 +48,6 @@ github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lne github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= -github.com/redis/go-redis/v9 v9.3.1 h1:KqdY8U+3X6z+iACvumCNxnoluToB+9Me+TvyFa21Mds= -github.com/redis/go-redis/v9 v9.3.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/safemap/safemap.go b/safemap/safemap.go index a33b0ff..2f77c69 100644 --- a/safemap/safemap.go +++ b/safemap/safemap.go @@ -3,44 +3,45 @@ package safemap import "sync" type SafeMap[K comparable, V any] struct { - data sync.Map + mu sync.RWMutex + data map[K]V } func New[K comparable, V any]() *SafeMap[K, V] { return &SafeMap[K, V]{ - data: sync.Map{}, + data: make(map[K]V), } } func (s *SafeMap[K, V]) Set(k K, v V) { - s.data.Store(k, v) + s.mu.Lock() + defer s.mu.Unlock() + s.data[k] = v } func (s *SafeMap[K, V]) Get(k K) (V, bool) { - val, ok := s.data.Load(k) - var zero V - if !ok { - return zero, false - } - return val.(V), ok + s.mu.RLock() + defer s.mu.RUnlock() + val, ok := s.data[k] + return val, ok } func (s *SafeMap[K, V]) Delete(k K) { - s.data.Delete(k) + s.mu.Lock() + defer s.mu.Unlock() + delete(s.data, k) } func (s *SafeMap[K, V]) Len() int { - count := 0 - s.data.Range(func(_, _ interface{}) bool { - count++ - return true - }) - return count + s.mu.RLock() + defer s.mu.RUnlock() + return len(s.data) } func (s *SafeMap[K, V]) ForEach(f func(K, V)) { - s.data.Range(func(key, value interface{}) bool { - f(key.(K), value.(V)) - return true - }) + s.mu.RLock() + defer s.mu.RUnlock() + for k, v := range s.data { + f(k, v) + } }