Без опису

event_distributor.go 4.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package funmow
  2. const (
  3. EventTypeEmit = iota // everyone in the containing object hears it,
  4. EventTypeOEmit // everyone in the containing object except the player hears it
  5. EventTypePEmit // only the player hears it
  6. EventTypeWall // broadcast message
  7. EventTypePage
  8. EventTypeSay
  9. EventTypePose
  10. EventTypeCommand //7
  11. EventTypeOutput //8
  12. EventTypeQuit
  13. EventTypeTeardown
  14. EventTypeTeardownComplete
  15. EventTypeForce
  16. EventTypeLogin
  17. EventTypeSystem
  18. )
  19. type PlayerEvent struct {
  20. connectionID int
  21. src DBRef
  22. dst DBRef
  23. messageType int
  24. message string
  25. }
  26. type playerRegistration struct {
  27. execContext *ExecutionContext
  28. connInbound map[int]chan PlayerEvent
  29. execInbound chan PlayerEvent
  30. }
  31. type EventSubscribeRequest struct {
  32. connectionID int
  33. playerID DBRef
  34. inbound chan PlayerEvent
  35. outbound chan chan PlayerEvent
  36. }
  37. type EventDistributor struct {
  38. subscribeChan chan EventSubscribeRequest
  39. db *DB
  40. players map[DBRef]*playerRegistration
  41. }
  42. func NewEventDistributor(db *DB) *EventDistributor {
  43. e := new(EventDistributor)
  44. e.db = db
  45. e.subscribeChan = make(chan EventSubscribeRequest)
  46. return e
  47. }
  48. func (e *EventDistributor) Subscribe(req EventSubscribeRequest) chan PlayerEvent {
  49. e.subscribeChan <- req
  50. outbound := <-req.outbound
  51. return outbound
  52. }
  53. func (e *EventDistributor) OnlinePlayers() DBRefList {
  54. //replyChan := make(chan DBRefList)
  55. //e.whoChan <- replyChan
  56. //onlinePlayers := <-replyChan
  57. //return onlinePlayers
  58. return DBRefList{}
  59. }
  60. //[client runs quit command]
  61. //connection -> "quit" -> exec
  62. //exec -> EventTypeQuit -> connection
  63. //[connection stops doing connectiony stuff]
  64. //connection -> EventTypeTeardown -> exec
  65. //[exec stops, kills off goroutines]
  66. //exec -> EventTypeTeardownComplete -> Event Distributor
  67. //Event Distributor cleans up
  68. //[client dies]
  69. //connection -> EventTypeTeardown -> exec
  70. //[exec stops, kills off goroutines]
  71. //exec -> EventTypeTeardownComplete -> Event Distributor
  72. //Event Distributor cleans up
  73. func (e *EventDistributor) Run() {
  74. //ipcChans := make(map[int]playerRegistration)
  75. e.players = make(map[DBRef]*playerRegistration)
  76. outbound := make(chan PlayerEvent)
  77. forceContext := NewForceContext(e, e.db, outbound)
  78. forceInbound := forceContext.StartInboundChannel()
  79. for {
  80. select {
  81. case sub := <-e.subscribeChan:
  82. if _, exists := e.players[sub.playerID]; !exists {
  83. c := NewExecutionContext(sub.playerID, e, e.db, outbound)
  84. e.players[sub.playerID] = &playerRegistration{
  85. execInbound: c.StartInboundChannel(),
  86. execContext: c,
  87. connInbound: make(map[int]chan PlayerEvent),
  88. }
  89. // mark player as online, and tell everyone!
  90. e.players[sub.playerID].execInbound <- PlayerEvent{messageType: EventTypeLogin}
  91. }
  92. reg := e.players[sub.playerID]
  93. reg.connInbound[sub.connectionID] = sub.inbound
  94. sub.outbound <- outbound
  95. case event := <-outbound: // message
  96. //fmt.Printf("received message src:%d dest:%d type: %d\n", event.src, event.dst, event.messageType)
  97. destPlayer, validDest := e.players[event.dst]
  98. switch event.messageType {
  99. case EventTypeForce:
  100. forceInbound <- event
  101. case EventTypeTeardownComplete:
  102. if validDest {
  103. delete(e.players, event.dst)
  104. }
  105. case EventTypeOutput: // from context to connection
  106. if validDest {
  107. e.sendToAllConnections(destPlayer.connInbound, event)
  108. }
  109. //destPlayer.connInbound <- event
  110. case EventTypeQuit: // from context to connection
  111. if validDest {
  112. e.players[event.dst].connInbound[event.connectionID] <- event
  113. }
  114. case EventTypeTeardown: // comes from client when connection is dropped
  115. if validDest {
  116. close(e.players[event.dst].connInbound[event.connectionID])
  117. delete(e.players[event.dst].connInbound, event.connectionID)
  118. if len(e.players[event.dst].connInbound) == 0 {
  119. close(e.players[event.dst].execInbound) // closing the inbound channel signals the exec to stop
  120. }
  121. }
  122. case EventTypeCommand: // from connection to context
  123. if validDest {
  124. destPlayer.execInbound <- event
  125. }
  126. case EventTypePEmit:
  127. if validDest {
  128. destPlayer.execInbound <- event
  129. }
  130. case EventTypePage:
  131. if validDest {
  132. destPlayer.execInbound <- event
  133. }
  134. case EventTypeEmit:
  135. e.broadcastEvent(event, false)
  136. case EventTypeOEmit:
  137. e.broadcastEvent(event, true) // fix
  138. case EventTypeSay:
  139. e.broadcastEvent(event, true)
  140. case EventTypePose:
  141. e.broadcastEvent(event, false)
  142. case EventTypeWall:
  143. e.broadcastEvent(event, false)
  144. case EventTypeSystem:
  145. e.broadcastEvent(event, false)
  146. }
  147. }
  148. }
  149. }
  150. func (e *EventDistributor) sendToAllConnections(connections map[int]chan PlayerEvent, event PlayerEvent) {
  151. for _, channel := range connections {
  152. channel <- event
  153. }
  154. }
  155. func (e *EventDistributor) broadcastEvent(event PlayerEvent, omitSrc bool) {
  156. for playerID, player := range e.players {
  157. if !omitSrc || playerID != event.src {
  158. player.execInbound <- event
  159. }
  160. }
  161. }