No Description

bgpdump.go 6.4KB


  1. package internet
  2. import (
  3. "bufio"
  4. "compress/gzip"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "time"
  14. "github.com/davecgh/go-spew/spew"
  15. "github.com/garyburd/redigo/redis"
  16. "github.com/osrg/gobgp/packet/bgp"
  17. "github.com/osrg/gobgp/packet/mrt"
  18. )
  19. // RefreshBGPDump ensures that the latest dump available is the one which is installed.
  20. func RefreshBGPDump(conn redis.Conn) (int, error) {
  21. for _, b := range []BGPDump{
  22. {Date: time.Now()},
  23. {Date: time.Now().Add(-time.Duration(time.Hour * 24))},
  24. } {
  25. err := b.Download()
  26. if err != nil {
  27. return 0, err
  28. }
  29. if b.IsDownloaded() {
  30. return b.Import(conn)
  31. }
  32. }
  33. return 0, nil
  34. }
  35. // BGPDump encapuslates downloading and importing of BGP dumps.
  36. type BGPDump struct {
  37. Date time.Time
  38. }
  39. // Import stores the contents of a downloaded BGP dump into a redis server.
  40. // -1 is returned if the dump is alredy imported into redis.
  41. func (b *BGPDump) Import(conn redis.Conn) (int, error) {
  42. alreadyImported, err := redis.Bool(conn.Do("SISMEMBER", "i2a:imported_dates", b.day()))
  43. if err != nil {
  44. return 0, err
  45. }
  46. if alreadyImported {
  47. return -1, nil
  48. }
  49. n, err := b.parseBGPDump(conn)
  50. return n, err
  51. }
  52. // IsDownloaded returns true if the BGPDump archive is downloaded locally.
  53. func (b *BGPDump) IsDownloaded() bool {
  54. p := b.Path()
  55. if _, err := os.Stat(p); err == nil {
  56. return true
  57. }
  58. return false
  59. }
  60. // Download fetches an bgpdump archive from http://data.ris.ripe.net/rrc00.
  61. // A http 404 status code does not generate an error, the isDownloaded() to check success after fetching.
  62. // Download returns early with no error if the file already is downloaded to disk.
  63. func (b *BGPDump) Download() error {
  64. dt := b.Date
  65. dumpDir := b.dir()
  66. err := os.MkdirAll(dumpDir, 0777)
  67. if err != nil {
  68. return err
  69. }
  70. if b.IsDownloaded() {
  71. return nil
  72. }
  73. err = os.MkdirAll(filepath.Join(dataDir, "spool"), 0777)
  74. if err != nil {
  75. return err
  76. }
  77. tempFile, err := ioutil.TempFile(
  78. filepath.Join(dataDir, "spool"), b.day())
  79. if err != nil {
  80. return err
  81. }
  82. defer tempFile.Close()
  83. dlURL := fmt.Sprintf(
  84. "http://data.ris.ripe.net/rrc00/%s/bview.%s.%s.gz",
  85. dt.Format("2006.01"), b.day(), "0000")
  86. resp, err := http.Get(dlURL)
  87. if err != nil {
  88. return err
  89. }
  90. // Dumps from ??? to 2010-06-14 are named timestamped 2359 so do a check
  91. // for that if 0000 fails. For very early dumps the format is not static so those will fail.
  92. if resp.StatusCode == 404 && dt.Before(time.Date(2010, 06, 15, 0, 0, 0, 0, time.UTC)) {
  93. // log.Printf("trying different url, got 404 for %s", dlURL)
  94. dlURL = fmt.Sprintf(
  95. "http://data.ris.ripe.net/rrc00/%s/bview.%s.%s.gz",
  96. dt.Format("2006.01"), b.day(), "2359")
  97. resp, err = http.Get(dlURL)
  98. if err != nil {
  99. return err
  100. }
  101. }
  102. if resp.StatusCode != http.StatusOK {
  103. if resp.StatusCode == 404 {
  104. // log.Printf("Skipping download, got 404 for %s", dlURL)
  105. return nil
  106. }
  107. return fmt.Errorf("Got http status code %s response for %s", resp.Status, dlURL)
  108. }
  109. // log.Printf("Downloading %s\n", dlURL)
  110. defer resp.Body.Close()
  111. _, err = io.Copy(tempFile, resp.Body)
  112. if err != nil {
  113. return err
  114. }
  115. err = os.Rename(tempFile.Name(), b.Path())
  116. if err != nil {
  117. return err
  118. }
  119. return nil
  120. }
  121. func (b *BGPDump) parseBGPDump(conn redis.Conn) (int, error) {
  122. day := b.day()
  123. n := 0
  124. f, err := os.Open(b.Path())
  125. if err != nil {
  126. return 0, err
  127. }
  128. gzipReader, err := gzip.NewReader(f)
  129. if err != nil {
  130. return n, fmt.Errorf("couldn't create gzip reader: %v", err)
  131. }
  132. scanner := bufio.NewScanner(gzipReader)
  133. scanner.Split(mrt.SplitMrt)
  134. count := 0
  135. indexTableCount := 0
  136. entries:
  137. for scanner.Scan() {
  138. count++
  139. data := scanner.Bytes()
  140. hdr := &mrt.MRTHeader{}
  141. errh := hdr.DecodeFromBytes(data[:mrt.MRT_COMMON_HEADER_LEN])
  142. if err != nil {
  143. return 0, errh
  144. }
  145. msg, err := mrt.ParseMRTBody(hdr, data[mrt.MRT_COMMON_HEADER_LEN:])
  146. if err != nil {
  147. log.Printf("could not parse mrt body: %v", err)
  148. continue entries
  149. }
  150. if msg.Header.Type != mrt.TABLE_DUMPv2 {
  151. return 0, fmt.Errorf("unexpected message type: %d", msg.Header.Type)
  152. }
  153. switch mtrBody := msg.Body.(type) {
  154. case *mrt.PeerIndexTable:
  155. indexTableCount++
  156. if indexTableCount != 1 {
  157. return 0, fmt.Errorf("got >1 PeerIndexTable")
  158. }
  159. case *mrt.Rib:
  160. prefix := mtrBody.Prefix
  161. if len(mtrBody.Entries) < 0 {
  162. return 0, fmt.Errorf("no entries")
  163. }
  164. for _, entry := range mtrBody.Entries {
  165. attrs:
  166. for _, attr := range entry.PathAttributes {
  167. switch attr := attr.(type) {
  168. case *bgp.PathAttributeAsPath:
  169. if len(attr.Value) < 1 {
  170. continue attrs
  171. }
  172. if v, ok := attr.Value[0].(*bgp.As4PathParam); ok {
  173. if len(v.AS) < 0 {
  174. continue attrs
  175. }
  176. conn.Send("HSET", fmt.Sprintf("i2a:%s", prefix), day, v.AS[len(v.AS)-1])
  177. n++
  178. if n%10000 == 0 {
  179. err := conn.Flush()
  180. if err != nil {
  181. return 0, err
  182. }
  183. }
  184. continue entries
  185. }
  186. }
  187. }
  188. }
  189. default:
  190. return 0, fmt.Errorf("unsupported message %v %s", mtrBody, spew.Sdump(msg))
  191. }
  192. }
  193. conn.Send("SADD", "i2a:imported_dates", day)
  194. err = conn.Flush()
  195. if err != nil {
  196. return 0, err
  197. }
  198. return n, nil
  199. }
  200. func (b *BGPDump) parseBGPCSV(r io.Reader, conn redis.Conn) (int, error) {
  201. day := b.day()
  202. s := bufio.NewScanner(r)
  203. n := 0
  204. var asn string
  205. for s.Scan() {
  206. cols := strings.Split(s.Text(), "|")
  207. if len(cols) < 7 {
  208. return n, ParseError{
  209. Message: "too few columns",
  210. Path: filepath.Base(b.Path()),
  211. LineNum: n,
  212. Line: s.Text(),
  213. }
  214. }
  215. block := cols[5]
  216. if _, ok := asn12654blocks[block]; ok {
  217. asn = "12654"
  218. } else {
  219. asPath := cols[6]
  220. asns := strings.Split(asPath, " ")
  221. asn = asns[len(asns)-1]
  222. if asn == "" {
  223. return n, ParseError{
  224. Message: "no ASPATH data",
  225. Path: filepath.Base(b.Path()),
  226. LineNum: n,
  227. Line: s.Text(),
  228. }
  229. }
  230. }
  231. conn.Send("HSET", fmt.Sprintf("i2a:%s", block), day, asn)
  232. n++
  233. if n%10000 == 0 {
  234. err := conn.Flush()
  235. if err != nil {
  236. return 0, err
  237. }
  238. }
  239. }
  240. conn.Send("SADD", "i2a:imported_dates", day)
  241. err := conn.Flush()
  242. if err != nil {
  243. return 0, err
  244. }
  245. return n, nil
  246. }
  247. // Path returns the absolute path to the target archive dump download file.
  248. func (b *BGPDump) Path() string {
  249. return filepath.Join(
  250. b.dir(), fmt.Sprintf("%s.gz", b.Date.Format("20060102")))
  251. }
  252. func (b *BGPDump) dir() string {
  253. return filepath.Join(
  254. dataDir, "cache", b.Date.Format("200601"))
  255. }
  256. func (b *BGPDump) day() string {
  257. return b.Date.Format("20060102")
  258. }