No Description

event_distributor.go 4.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package funmow
  2. const (
  3. EventTypeEmit = iota // everyone in the containing object hears it,
  4. EventTypeOEmit // everyone in the containing object except the player hears it
  5. EventTypePEmit // only the player hears it
  6. EventTypeWall // broadcast message
  7. EventTypePage
  8. EventTypeSay
  9. EventTypePose
  10. EventTypeCommand //7
  11. EventTypeOutput //8
  12. EventTypeQuit
  13. EventTypeTeardown
  14. EventTypeTeardownComplete
  15. EventTypeForce
  16. )
  17. type PlayerEvent struct {
  18. connectionID int
  19. src DBRef
  20. dst DBRef
  21. messageType int
  22. message string
  23. }
  24. type playerRegistration struct {
  25. execContext *ExecutionContext
  26. connInbound map[int]chan PlayerEvent
  27. execInbound chan PlayerEvent
  28. }
  29. type EventSubscribeRequest struct {
  30. connectionID int
  31. playerID DBRef
  32. inbound chan PlayerEvent
  33. outbound chan chan PlayerEvent
  34. }
  35. type EventDistributor struct {
  36. subscribeChan chan EventSubscribeRequest
  37. db *DB
  38. players map[DBRef]*playerRegistration
  39. }
  40. func NewEventDistributor(db *DB) *EventDistributor {
  41. e := new(EventDistributor)
  42. e.db = db
  43. e.subscribeChan = make(chan EventSubscribeRequest)
  44. return e
  45. }
  46. func (e *EventDistributor) Subscribe(req EventSubscribeRequest) chan PlayerEvent {
  47. e.subscribeChan <- req
  48. outbound := <-req.outbound
  49. return outbound
  50. }
  51. func (e *EventDistributor) OnlinePlayers() DBRefList {
  52. //replyChan := make(chan DBRefList)
  53. //e.whoChan <- replyChan
  54. //onlinePlayers := <-replyChan
  55. //return onlinePlayers
  56. return DBRefList{}
  57. }
  58. //[client runs quit command]
  59. //connection -> "quit" -> exec
  60. //exec -> EventTypeQuit -> connection
  61. //[connection stops doing connectiony stuff]
  62. //connection -> EventTypeTeardown -> exec
  63. //[exec stops, kills off goroutines]
  64. //exec -> EventTypeTeardownComplete -> Event Distributor
  65. //Event Distributor cleans up
  66. //[client dies]
  67. //connection -> EventTypeTeardown -> exec
  68. //[exec stops, kills off goroutines]
  69. //exec -> EventTypeTeardownComplete -> Event Distributor
  70. //Event Distributor cleans up
  71. func (e *EventDistributor) Run() {
  72. //ipcChans := make(map[int]playerRegistration)
  73. e.players = make(map[DBRef]*playerRegistration)
  74. outbound := make(chan PlayerEvent)
  75. forceContext := NewForceContext(e, e.db, outbound)
  76. forceInbound := forceContext.StartInboundChannel()
  77. for {
  78. select {
  79. case sub := <-e.subscribeChan:
  80. if _, exists := e.players[sub.playerID]; !exists {
  81. c := NewExecutionContext(sub.playerID, e, e.db, outbound)
  82. e.players[sub.playerID] = &playerRegistration{
  83. execInbound: c.StartInboundChannel(),
  84. execContext: c,
  85. connInbound: make(map[int]chan PlayerEvent),
  86. }
  87. }
  88. reg := e.players[sub.playerID]
  89. reg.connInbound[sub.connectionID] = sub.inbound
  90. sub.outbound <- outbound
  91. case event := <-outbound: // message
  92. //fmt.Printf("received message src:%d dest:%d type: %d\n", event.src, event.dst, event.messageType)
  93. destPlayer, validDest := e.players[event.dst]
  94. switch event.messageType {
  95. case EventTypeForce:
  96. forceInbound <- event
  97. case EventTypeTeardownComplete:
  98. if validDest {
  99. delete(e.players, event.dst)
  100. }
  101. case EventTypeOutput: // from context to connection
  102. if validDest {
  103. e.sendToAllConnections(destPlayer.connInbound, event)
  104. }
  105. //destPlayer.connInbound <- event
  106. case EventTypeQuit: // from context to connection
  107. if validDest {
  108. e.players[event.dst].connInbound[event.connectionID] <- event
  109. }
  110. case EventTypeTeardown: // comes from client when connection is dropped
  111. if validDest {
  112. close(e.players[event.dst].connInbound[event.connectionID])
  113. delete(e.players[event.dst].connInbound, event.connectionID)
  114. if len(e.players[event.dst].connInbound) == 0 {
  115. close(e.players[event.dst].execInbound) // closing the inbound channel signals the exec to stop
  116. }
  117. }
  118. case EventTypeCommand: // from connection to context
  119. if validDest {
  120. destPlayer.execInbound <- event
  121. }
  122. case EventTypePEmit:
  123. if validDest {
  124. destPlayer.execInbound <- event
  125. }
  126. case EventTypePage:
  127. if validDest {
  128. destPlayer.execInbound <- event
  129. }
  130. case EventTypeEmit:
  131. e.broadcastEvent(event, false)
  132. case EventTypeOEmit:
  133. e.broadcastEvent(event, true) // fix
  134. case EventTypeSay:
  135. e.broadcastEvent(event, true)
  136. case EventTypePose:
  137. e.broadcastEvent(event, false)
  138. case EventTypeWall:
  139. e.broadcastEvent(event, false)
  140. }
  141. }
  142. }
  143. }
  144. func (e *EventDistributor) sendToAllConnections(connections map[int]chan PlayerEvent, event PlayerEvent) {
  145. for _, channel := range connections {
  146. channel <- event
  147. }
  148. }
  149. func (e *EventDistributor) broadcastEvent(event PlayerEvent, omitSrc bool) {
  150. for playerID, player := range e.players {
  151. if !omitSrc || playerID != event.src {
  152. player.execInbound <- event
  153. }
  154. }
  155. }