Нема описа

event_distributor.go 5.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. package funmow
  2. const (
  3. EventTypeEmit = iota // everyone in the containing object hears it,
  4. EventTypeOEmit // everyone in the containing object except the player hears it
  5. EventTypePEmit // only the player hears it
  6. EventTypeWall // broadcast message
  7. EventTypePage
  8. EventTypeSay
  9. EventTypePose
  10. EventTypeCommand //7
  11. EventTypeOutput //8
  12. EventTypeQuit
  13. EventTypeTeardown
  14. EventTypeTeardownComplete
  15. EventTypeForce
  16. EventTypeLogin
  17. EventTypeSystem
  18. )
  19. type PlayerEvent struct {
  20. connectionID int
  21. src DBRef
  22. dst DBRef
  23. messageType int
  24. message string
  25. }
  26. type playerRegistration struct {
  27. execContext *ExecutionContext
  28. connInbound map[int]chan PlayerEvent
  29. execInbound chan PlayerEvent
  30. }
  31. type EventSubscribeRequest struct {
  32. connectionID int
  33. playerID DBRef
  34. inbound chan PlayerEvent
  35. outbound chan chan PlayerEvent
  36. }
  37. type EventDistributor struct {
  38. subscribeChan chan EventSubscribeRequest
  39. db *DB
  40. players map[DBRef]*playerRegistration
  41. whoChan chan chan DBRefList
  42. }
  43. func NewEventDistributor(db *DB) *EventDistributor {
  44. e := new(EventDistributor)
  45. e.db = db
  46. e.subscribeChan = make(chan EventSubscribeRequest)
  47. e.whoChan = make(chan chan DBRefList)
  48. return e
  49. }
  50. func (e *EventDistributor) Subscribe(req EventSubscribeRequest) chan PlayerEvent {
  51. e.subscribeChan <- req
  52. outbound := <-req.outbound
  53. return outbound
  54. }
  55. func (e *EventDistributor) OnlinePlayers() DBRefList {
  56. replyChan := make(chan DBRefList)
  57. e.whoChan <- replyChan
  58. onlinePlayers := <-replyChan
  59. return onlinePlayers
  60. }
  61. //[client runs quit command]
  62. //connection -> "quit" -> exec
  63. //exec -> EventTypeQuit -> connection
  64. //[connection stops doing connectiony stuff]
  65. //connection -> EventTypeTeardown -> exec
  66. //[exec stops, kills off goroutines]
  67. //exec -> EventTypeTeardownComplete -> Event Distributor
  68. //Event Distributor cleans up
  69. //[client dies]
  70. //connection -> EventTypeTeardown -> exec
  71. //[exec stops, kills off goroutines]
  72. //exec -> EventTypeTeardownComplete -> Event Distributor
  73. //Event Distributor cleans up
  74. func (e *EventDistributor) Run() {
  75. //ipcChans := make(map[int]playerRegistration)
  76. e.players = make(map[DBRef]*playerRegistration)
  77. outbound := make(chan PlayerEvent)
  78. outboundBuffered := EventBuffer(outbound)
  79. forceContext := NewForceContext(e, e.db, outbound)
  80. forceInbound := forceContext.StartInboundChannel()
  81. for {
  82. select {
  83. case replyChan := <-e.whoChan:
  84. onlinePlayers := make(DBRefList, 0)
  85. for playerID, _ := range e.players {
  86. onlinePlayers = append(onlinePlayers, playerID)
  87. }
  88. replyChan <- onlinePlayers
  89. case sub := <-e.subscribeChan:
  90. if _, exists := e.players[sub.playerID]; !exists {
  91. c := NewExecutionContext(sub.playerID, e, e.db, outbound)
  92. e.players[sub.playerID] = &playerRegistration{
  93. execInbound: c.StartInboundChannel(),
  94. execContext: c,
  95. connInbound: make(map[int]chan PlayerEvent),
  96. }
  97. // mark player as online, and tell everyone!
  98. e.players[sub.playerID].execInbound <- PlayerEvent{messageType: EventTypeLogin}
  99. }
  100. reg := e.players[sub.playerID]
  101. reg.connInbound[sub.connectionID] = sub.inbound
  102. sub.outbound <- outbound
  103. case event := <-outboundBuffered: // message
  104. //fmt.Printf("received message src:%d dest:%d type: %d\n", event.src, event.dst, event.messageType)
  105. destPlayer, validDest := e.players[event.dst]
  106. switch event.messageType {
  107. case EventTypeForce:
  108. forceInbound <- event
  109. case EventTypeTeardownComplete:
  110. if validDest {
  111. delete(e.players, event.dst)
  112. }
  113. case EventTypeOutput: // from context to connection
  114. if validDest {
  115. e.sendToAllConnections(destPlayer.connInbound, event)
  116. }
  117. //destPlayer.connInbound <- event
  118. case EventTypeQuit: // from context to connection
  119. if validDest {
  120. e.players[event.dst].connInbound[event.connectionID] <- event
  121. }
  122. case EventTypeTeardown: // comes from client when connection is dropped
  123. if validDest {
  124. close(e.players[event.dst].connInbound[event.connectionID])
  125. delete(e.players[event.dst].connInbound, event.connectionID)
  126. if len(e.players[event.dst].connInbound) == 0 {
  127. close(e.players[event.dst].execInbound) // closing the inbound channel signals the exec to stop
  128. }
  129. }
  130. case EventTypeCommand: // from connection to context
  131. if validDest {
  132. destPlayer.execInbound <- event
  133. }
  134. case EventTypePEmit:
  135. if validDest {
  136. destPlayer.execInbound <- event
  137. }
  138. case EventTypePage:
  139. if validDest {
  140. destPlayer.execInbound <- event
  141. }
  142. case EventTypeEmit:
  143. e.broadcastEvent(event, false)
  144. case EventTypeOEmit:
  145. e.broadcastEvent(event, true) // fix
  146. case EventTypeSay:
  147. e.broadcastEvent(event, true)
  148. case EventTypePose:
  149. e.broadcastEvent(event, false)
  150. case EventTypeWall:
  151. e.broadcastEvent(event, false)
  152. case EventTypeSystem:
  153. e.broadcastEvent(event, false)
  154. }
  155. }
  156. }
  157. }
  158. func (e *EventDistributor) sendToAllConnections(connections map[int]chan PlayerEvent, event PlayerEvent) {
  159. for _, channel := range connections {
  160. channel <- event
  161. }
  162. }
  163. func (e *EventDistributor) broadcastEvent(event PlayerEvent, omitSrc bool) {
  164. for playerID, player := range e.players {
  165. if !omitSrc || playerID != event.src {
  166. player.execInbound <- event
  167. }
  168. }
  169. }