Nessuna descrizione

event_distributor.go 5.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package funmow
  2. import "fmt"
  3. const (
  4. EventTypeEmit = iota // everyone in the containing object hears it,
  5. EventTypeOEmit // everyone in the containing object except the player hears it
  6. EventTypePEmit // only the player hears it
  7. EventTypeWall // broadcast message
  8. EventTypePage
  9. EventTypeSay
  10. EventTypePose
  11. EventTypeCommand //7
  12. EventTypeOutput //8
  13. EventTypeQuit
  14. EventTypeTeardown
  15. EventTypeTeardownComplete
  16. EventTypeForce
  17. )
  18. type PlayerEvent struct {
  19. connectionID int
  20. src DBRef
  21. dst DBRef
  22. messageType int
  23. message string
  24. }
  25. type playerRegistration struct {
  26. execContext *ExecutionContext
  27. connInbound map[int]chan PlayerEvent
  28. execInbound chan PlayerEvent
  29. }
  30. type EventSubscribeRequest struct {
  31. connectionID int
  32. playerID DBRef
  33. inbound chan PlayerEvent
  34. outbound chan chan PlayerEvent
  35. }
  36. type EventDistributor struct {
  37. subscribeChan chan EventSubscribeRequest
  38. db *DB
  39. players map[DBRef]*playerRegistration
  40. }
  41. func NewEventDistributor(db *DB) *EventDistributor {
  42. e := new(EventDistributor)
  43. e.db = db
  44. e.subscribeChan = make(chan EventSubscribeRequest)
  45. return e
  46. }
  47. func (e *EventDistributor) Subscribe(req EventSubscribeRequest) chan PlayerEvent {
  48. e.subscribeChan <- req
  49. outbound := <-req.outbound
  50. return outbound
  51. }
  52. func (e *EventDistributor) OnlinePlayers() DBRefList {
  53. //replyChan := make(chan DBRefList)
  54. //e.whoChan <- replyChan
  55. //onlinePlayers := <-replyChan
  56. //return onlinePlayers
  57. return DBRefList{}
  58. }
  59. //[client runs quit command]
  60. //connection -> "quit" -> exec
  61. //exec -> EventTypeQuit -> connection
  62. //[connection stops doing connectiony stuff]
  63. //connection -> EventTypeTeardown -> exec
  64. //[exec stops, kills off goroutines]
  65. //exec -> EventTypeTeardownComplete -> Event Distributor
  66. //Event Distributor cleans up
  67. //[client dies]
  68. //connection -> EventTypeTeardown -> exec
  69. //[exec stops, kills off goroutines]
  70. //exec -> EventTypeTeardownComplete -> Event Distributor
  71. //Event Distributor cleans up
  72. func (e *EventDistributor) Run() {
  73. //ipcChans := make(map[int]playerRegistration)
  74. e.players = make(map[DBRef]*playerRegistration)
  75. outbound := make(chan PlayerEvent)
  76. forceContext := NewForceContext(e, e.db, outbound)
  77. forceInbound := forceContext.StartInboundChannel()
  78. for {
  79. select {
  80. case sub := <-e.subscribeChan:
  81. if _, exists := e.players[sub.playerID]; !exists {
  82. c := NewExecutionContext(sub.playerID, e, e.db, outbound)
  83. e.players[sub.playerID] = &playerRegistration{
  84. execInbound: c.StartInboundChannel(),
  85. execContext: c,
  86. connInbound: make(map[int]chan PlayerEvent),
  87. }
  88. }
  89. reg := e.players[sub.playerID]
  90. fmt.Println("connection id", sub.connectionID)
  91. reg.connInbound[sub.connectionID] = sub.inbound
  92. sub.outbound <- outbound
  93. case event := <-outbound: // message
  94. fmt.Printf("received message src:%d dest:%d type: %d\n", event.src, event.dst, event.messageType)
  95. destPlayer, validDest := e.players[event.dst]
  96. switch event.messageType {
  97. case EventTypeForce:
  98. forceInbound <- event
  99. case EventTypeTeardownComplete:
  100. if validDest {
  101. delete(e.players, event.dst)
  102. fmt.Println("EventTypeTeardownComplete")
  103. }
  104. case EventTypeOutput: // from context to connection
  105. if validDest {
  106. e.sendToAllConnections(destPlayer.connInbound, event)
  107. }
  108. //destPlayer.connInbound <- event
  109. case EventTypeQuit: // from context to connection
  110. if validDest {
  111. e.players[event.dst].connInbound[event.connectionID] <- event
  112. }
  113. case EventTypeTeardown: // comes from client when connection is dropped
  114. fmt.Println("received EventTypeTeardown")
  115. if validDest {
  116. close(e.players[event.dst].connInbound[event.connectionID])
  117. delete(e.players[event.dst].connInbound, event.connectionID)
  118. fmt.Println("EventTypeTeardown delete", event.connectionID, "len: ", len(e.players[event.dst].connInbound))
  119. if len(e.players[event.dst].connInbound) == 0 {
  120. fmt.Println("EventTypeTeardown killing exe")
  121. close(e.players[event.dst].execInbound) // closing the inbound channel signals the exec to stop
  122. }
  123. }
  124. case EventTypeCommand: // from connection to context
  125. if validDest {
  126. destPlayer.execInbound <- event
  127. }
  128. case EventTypePEmit:
  129. if validDest {
  130. destPlayer.execInbound <- event
  131. }
  132. case EventTypePage:
  133. if validDest {
  134. destPlayer.execInbound <- event
  135. }
  136. case EventTypeEmit:
  137. e.broadcastEvent(event, false)
  138. case EventTypeOEmit:
  139. e.broadcastEvent(event, true) // fix
  140. case EventTypeSay:
  141. e.broadcastEvent(event, true)
  142. case EventTypePose:
  143. e.broadcastEvent(event, false)
  144. case EventTypeWall:
  145. e.broadcastEvent(event, false)
  146. }
  147. }
  148. }
  149. }
  150. func (e *EventDistributor) sendToAllConnections(connections map[int]chan PlayerEvent, event PlayerEvent) {
  151. for _, channel := range connections {
  152. channel <- event
  153. }
  154. }
  155. func (e *EventDistributor) broadcastEvent(event PlayerEvent, omitSrc bool) {
  156. for playerID, player := range e.players {
  157. if !omitSrc || playerID != event.src {
  158. player.execInbound <- event
  159. }
  160. }
  161. }