Переглянути джерело

Multiple changes

Fixes #6
Fixes #7
Thomas Frössman 9 роки тому
джерело
коміт
0187604f50
8 змінених файлів з 189 додано та 101 видалено
  1. 5
    6
      asn2asd.go
  2. 61
    37
      bgpdump.go
  3. 16
    8
      bgpdump_test.go
  4. 34
    20
      cidrreport.go
  5. 14
    4
      cidrreport_test.go
  6. 28
    10
      cmd/dev/dev.go
  7. 12
    0
      internet.go
  8. 19
    16
      ip2asn.go

+ 5
- 6
asn2asd.go Переглянути файл

@@ -26,19 +26,18 @@ func (i *ASN2ASDescClient) importedDates() ([]string, error) {
26 26
 }
27 27
 
28 28
 // Current returns the latest known result for an IP2ASN lookup.
29
-func (i *ASN2ASDescClient) Current(ASN int) string {
29
+func (i *ASN2ASDescClient) Current(ASN int) (string, error) {
30 30
 	allDates, err := i.importedDates()
31 31
 	if err != nil {
32
-		return ""
32
+		return "", err
33 33
 	}
34 34
 	if len(allDates) < 0 {
35
-		return ""
35
+		return "", err
36 36
 	}
37 37
 	current := allDates[len(allDates)-1]
38 38
 	result, err := redis.String(i.conn.Do("HGET", fmt.Sprintf("asd:%d", ASN), current))
39 39
 	if err != nil {
40
-		return ""
40
+		return "", err
41 41
 	}
42
-
43
-	return result
42
+	return result, nil
44 43
 }

+ 61
- 37
bgpdump.go Переглянути файл

@@ -2,6 +2,7 @@ package internet
2 2
 
3 3
 import (
4 4
 	"bufio"
5
+	"errors"
5 6
 	"fmt"
6 7
 	"io"
7 8
 	"io/ioutil"
@@ -11,30 +12,26 @@ import (
11 12
 	"os/exec"
12 13
 	"path/filepath"
13 14
 	"strings"
14
-	"sync"
15 15
 	"time"
16 16
 
17 17
 	"github.com/garyburd/redigo/redis"
18 18
 )
19 19
 
20 20
 // RefreshBGPDump ensures that the latest dump available is the one which is installed.
21
-func RefreshBGPDump(conn redis.Conn) error {
21
+func RefreshBGPDump(conn redis.Conn) (int, error) {
22 22
 	for _, b := range []BGPDump{
23 23
 		{Date: time.Now()},
24 24
 		{Date: time.Now().Add(-time.Duration(time.Hour * 24))},
25 25
 	} {
26 26
 		err := b.Download()
27 27
 		if err != nil {
28
-			return err
28
+			return 0, err
29 29
 		}
30 30
 		if b.IsDownloaded() {
31
-			if err := b.Import(conn); err != nil {
32
-				return err
33
-			}
34
-			return nil
31
+			return b.Import(conn)
35 32
 		}
36 33
 	}
37
-	return nil
34
+	return 0, nil
38 35
 }
39 36
 
40 37
 // BGPDump encapuslates downloading and importing of BGP dumps.
@@ -43,35 +40,60 @@ type BGPDump struct {
43 40
 }
44 41
 
45 42
 // Import stores the contents of a downloaded BGP dump into a redis server.
46
-func (b *BGPDump) Import(conn redis.Conn) error {
47
-
43
+// -1 is returned if the dump is alredy imported into redis.
44
+func (b *BGPDump) Import(conn redis.Conn) (int, error) {
48 45
 	alreadyImported, err := redis.Bool(conn.Do("SISMEMBER", "i2a:imported_dates", b.day()))
49 46
 	if err != nil {
50
-		panic(err)
47
+		return 0, err
51 48
 	}
52 49
 	if alreadyImported {
53
-		return nil
50
+		return -1, nil
54 51
 	}
55 52
 	c := exec.Command("bgpdump", "-m", b.Path())
56
-
57 53
 	stdout, err := c.StdoutPipe()
58 54
 	if err != nil {
59
-		return err
55
+		return 0, err
56
+	}
60 57
 
58
+	type nErr struct {
59
+		n   int
60
+		err error
61 61
 	}
62
-	var wg sync.WaitGroup
63
-	wg.Add(1)
62
+
63
+	parseC := make(chan nErr)
64 64
 	go func(r io.Reader) {
65
-		defer wg.Done()
66
-		b.parseBGPCSV(r, conn)
65
+		defer func() {
66
+			if err := recover(); err != nil {
67
+				log.Println(err)
68
+				switch err.(type) {
69
+				case error:
70
+					parseC <- nErr{
71
+						err: err.(error),
72
+					}
73
+				default:
74
+					parseC <- nErr{err: errors.New("unknown error")}
75
+				}
76
+			}
77
+		}()
78
+		n, err := b.parseBGPCSV(r, conn)
79
+		parseC <- nErr{n, err}
67 80
 	}(stdout)
68
-	err = c.Run()
69
-	if err != nil {
70
-		return err
81
+
82
+	execC := make(chan error)
83
+	go func() {
84
+		err = c.Run()
85
+		if err != nil {
86
+			execC <- err
87
+		}
88
+	}()
89
+
90
+	select {
91
+	case err := <-execC:
92
+		return 0, err
93
+	case ne := <-parseC:
94
+		return ne.n, ne.err
71 95
 	}
72
-	wg.Wait()
73 96
 
74
-	return nil
75 97
 }
76 98
 
77 99
 // IsDownloaded returns true if the BGPDump archive is downloaded locally.
@@ -152,18 +174,20 @@ func (b *BGPDump) Download() error {
152 174
 
153 175
 }
154 176
 
155
-func (b *BGPDump) parseBGPCSV(r io.Reader, conn redis.Conn) error {
177
+func (b *BGPDump) parseBGPCSV(r io.Reader, conn redis.Conn) (int, error) {
156 178
 	day := b.day()
157
-
158
-	start := time.Now()
159 179
 	s := bufio.NewScanner(r)
160 180
 	n := 0
161 181
 	var asn string
162 182
 	for s.Scan() {
163 183
 		cols := strings.Split(s.Text(), "|")
164 184
 		if len(cols) < 7 {
165
-			log.Printf("Too few columns in %s:%d: %s", filepath.Base(b.Path()), n, s.Text())
166
-			continue
185
+			return n, ParseError{
186
+				Message: "too few columns",
187
+				Path:    filepath.Base(b.Path()),
188
+				LineNum: n,
189
+				Line:    s.Text(),
190
+			}
167 191
 		}
168 192
 		block := cols[5]
169 193
 
@@ -174,8 +198,12 @@ func (b *BGPDump) parseBGPCSV(r io.Reader, conn redis.Conn) error {
174 198
 			asns := strings.Split(asPath, " ")
175 199
 			asn = asns[len(asns)-1]
176 200
 			if asn == "" {
177
-				log.Printf("No ASPATH data for %s:%d: %s", filepath.Base(b.Path()), n, s.Text())
178
-				continue
201
+				return n, ParseError{
202
+					Message: "no ASPATH data",
203
+					Path:    filepath.Base(b.Path()),
204
+					LineNum: n,
205
+					Line:    s.Text(),
206
+				}
179 207
 			}
180 208
 		}
181 209
 		conn.Send("HSET", fmt.Sprintf("i2a:%s", block), day, asn)
@@ -183,20 +211,16 @@ func (b *BGPDump) parseBGPCSV(r io.Reader, conn redis.Conn) error {
183 211
 		if n%10000 == 0 {
184 212
 			err := conn.Flush()
185 213
 			if err != nil {
186
-				panic(err)
214
+				return 0, err
187 215
 			}
188 216
 		}
189 217
 	}
190 218
 	conn.Send("SADD", "i2a:imported_dates", day)
191 219
 	err := conn.Flush()
192 220
 	if err != nil {
193
-		panic(err)
221
+		return 0, err
194 222
 	}
195
-
196
-	log.Printf("Imported %d rows from %s in %s",
197
-		n, filepath.Base(b.Path()), time.Since(start))
198
-
199
-	return nil
223
+	return n, nil
200 224
 }
201 225
 
202 226
 // Path returns the absolute path to the target archive dump download file.

+ 16
- 8
bgpdump_test.go Переглянути файл

@@ -3,7 +3,6 @@ package internet
3 3
 import (
4 4
 	"os"
5 5
 	"testing"
6
-
7 6
 	"time"
8 7
 
9 8
 	"github.com/garyburd/redigo/redis"
@@ -26,8 +25,13 @@ func TestParseDump(t *testing.T) {
26 25
 	}
27 26
 	conn := redigomock.NewConn()
28 27
 	b := BGPDump{Date: time.Date(2015, 1, 1, 0, 0, 0, 0, time.UTC)}
29
-	b.parseBGPCSV(file, conn)
30
-
28
+	n, err := b.parseBGPCSV(file, conn)
29
+	if err != nil {
30
+		panic(err)
31
+	}
32
+	if n != 801 {
33
+		t.Fatalf("expected 801 imported entries, got %d", n)
34
+	}
31 35
 }
32 36
 
33 37
 func TestParseBrokenDump(t *testing.T) {
@@ -35,12 +39,16 @@ func TestParseBrokenDump(t *testing.T) {
35 39
 	if err != nil {
36 40
 		panic(err)
37 41
 	}
38
-
39 42
 	conn := redigomock.NewConn()
40 43
 	b := BGPDump{Date: time.Date(2015, 1, 2, 0, 0, 0, 0, time.UTC)}
41
-	err = b.parseBGPCSV(file, conn)
42
-	if err != nil {
43
-		panic(err)
44
+	n, err := b.parseBGPCSV(file, conn)
45
+	if err == nil {
46
+		t.Fatalf("expected parse error")
47
+	}
48
+	if n != 5 {
49
+		t.Fatalf("expected parse error on line 5 but %d lines were read", n)
50
+	}
51
+	if _, ok := err.(ParseError); !ok {
52
+		t.Fatalf("expected parse error, got %s", err)
44 53
 	}
45
-
46 54
 }

+ 34
- 20
cidrreport.go Переглянути файл

@@ -1,7 +1,6 @@
1 1
 package internet
2 2
 
3 3
 import (
4
-	"errors"
5 4
 	"fmt"
6 5
 	"io"
7 6
 	"io/ioutil"
@@ -53,39 +52,42 @@ func (b *CIDRReport) IsDownloaded() bool {
53 52
 }
54 53
 
55 54
 // Import stores the contents of a downloaded BGP dump into a redis server.
56
-func (b *CIDRReport) Import(conn redis.Conn) error {
55
+// -1 is returned if the dump is alredy imported into redis.
56
+func (b *CIDRReport) Import(conn redis.Conn) (int, error) {
57 57
 
58 58
 	alreadyImported, err := redis.Bool(conn.Do("SISMEMBER", "asd:imported_dates", b.day()))
59 59
 	if err != nil {
60
-		return err
60
+		return 0, err
61 61
 	}
62 62
 	if alreadyImported {
63
-		return nil
63
+		return -1, nil
64 64
 	}
65 65
 
66 66
 	file, err := os.Open(b.Path())
67 67
 	if err != nil {
68
-		return err
68
+		return 0, err
69 69
 	}
70 70
 	n := 0
71 71
 	day := b.day()
72
-	err = parseReport(file, func(asd *ASDescription) {
72
+	err = parseReport(file, func(asd *ASDescription) error {
73 73
 		conn.Send("HSET", fmt.Sprintf("asd:%d", asd.ASN), day,
74 74
 			fmt.Sprintf("%s, %s", asd.Description, asd.CountryCode))
75 75
 		n++
76 76
 		if n%10000 == 0 {
77 77
 			err := conn.Flush()
78 78
 			if err != nil {
79
-				panic(err)
79
+				return err
80
+
80 81
 			}
81 82
 		}
83
+		return nil
82 84
 	})
83 85
 	conn.Send("SADD", "asd:imported_dates", day)
84 86
 	err = conn.Flush()
85 87
 	if err != nil {
86
-		panic(err)
88
+		return 0, err
87 89
 	}
88
-	return nil
90
+	return n, nil
89 91
 }
90 92
 
91 93
 // Download fetches http://www.cidr-report.org/as2.0/autnums.html and stores it
@@ -134,26 +136,23 @@ func (b *CIDRReport) Download() error {
134 136
 }
135 137
 
136 138
 // RefreshCIDRReport ensures that the latest dump available is the one which is installed.
137
-func RefreshCIDRReport(conn redis.Conn) error {
139
+func RefreshCIDRReport(conn redis.Conn) (int, error) {
138 140
 	for _, b := range []CIDRReport{
139 141
 		{Date: time.Now()},
140 142
 		{Date: time.Now().Add(-time.Duration(time.Hour * 24))},
141 143
 	} {
142 144
 		err := b.Download()
143 145
 		if err != nil {
144
-			return err
146
+			return 0, err
145 147
 		}
146 148
 		if b.IsDownloaded() {
147
-			if err := b.Import(conn); err != nil {
148
-				return err
149
-			}
150
-			return nil
149
+			return b.Import(conn)
151 150
 		}
152 151
 	}
153
-	return nil
152
+	return 0, nil
154 153
 }
155 154
 
156
-func parseReport(r io.Reader, emitter func(*ASDescription)) error {
155
+func parseReport(r io.Reader, emitter func(*ASDescription) error) error {
157 156
 	z := html.NewTokenizer(r)
158 157
 	n := 0
159 158
 	depth := 0
@@ -169,7 +168,13 @@ loop:
169 168
 				desc := strings.TrimSpace(string(z.Text()))
170 169
 				ccpos := strings.LastIndex(desc, ",")
171 170
 				if ccpos == -1 {
172
-					return fmt.Errorf("Could not parse country code from %d %s", asn, desc)
171
+					return ParseError{
172
+						Message: "Could not parse country code",
173
+						Path:    "cidrreport",
174
+						LineNum: n,
175
+						Line:    fmt.Sprintf("asn:%s desc:%s", asn, desc),
176
+					}
177
+
173 178
 				}
174 179
 				emitter(&ASDescription{
175 180
 					ASN:         *asn,
@@ -183,7 +188,11 @@ loop:
183 188
 				var err error
184 189
 				i, err := strconv.Atoi(strings.TrimSpace(asnstr))
185 190
 				if err != nil {
186
-					return err
191
+					return ParseError{
192
+						Message: err.Error(),
193
+						Path:    "cidrreport",
194
+						LineNum: n,
195
+					}
187 196
 				}
188 197
 				asn = &i
189 198
 			}
@@ -199,7 +208,12 @@ loop:
199 208
 		}
200 209
 	}
201 210
 	if n == 0 {
202
-		return errors.New("No entries found, the parsing failed")
211
+		return ParseError{
212
+			Message: "no entries found",
213
+			Path:    "cidrreport",
214
+			LineNum: n,
215
+		}
216
+
203 217
 	}
204 218
 	return nil
205 219
 }

+ 14
- 4
cidrreport_test.go Переглянути файл

@@ -8,8 +8,9 @@ import (
8 8
 func TestParseCidrreport(t *testing.T) {
9 9
 	file, err := os.Open("testdata/autnums.sample.txt")
10 10
 	n := 0
11
-	err = parseReport(file, func(*ASDescription) {
11
+	err = parseReport(file, func(*ASDescription) error {
12 12
 		n++
13
+		return nil
13 14
 	})
14 15
 	if err != nil {
15 16
 		t.Error(err)
@@ -21,8 +22,17 @@ func TestParseCidrreport(t *testing.T) {
21 22
 
22 23
 func TestParseBrokenCidrreport(t *testing.T) {
23 24
 	file, err := os.Open("testdata/autnums.invalid.sample.txt")
24
-	err = parseReport(file, func(*ASDescription) {})
25
-	if err == nil {
26
-		t.Errorf("Parse error was expected")
25
+	err = parseReport(file, func(*ASDescription) error { return nil })
26
+	if _, ok := err.(ParseError); !ok {
27
+		t.Fatalf("expected parse error, got %s", err)
28
+	}
29
+}
30
+
31
+func TestParseBrokenLineCidrreport(t *testing.T) {
32
+	file, err := os.Open("testdata/autnums.invalid.sample2.txt")
33
+	err = parseReport(file, func(*ASDescription) error { return nil })
34
+
35
+	if _, ok := err.(ParseError); !ok {
36
+		t.Fatalf("expected parse error, got %s", err)
27 37
 	}
28 38
 }

+ 28
- 10
cmd/dev/dev.go Переглянути файл

@@ -18,7 +18,6 @@ import (
18 18
 func main() {
19 19
 	flag.Parse()
20 20
 	pool = newPool(*redisServer, "")
21
-
22 21
 	mainCidrr()
23 22
 	mainIP2ASN()
24 23
 }
@@ -27,6 +26,7 @@ func mainCidrr() {
27 26
 	cc := internet.CIDRReport{
28 27
 		Date: time.Now(),
29 28
 	}
29
+	log.Printf("DOWNLOAD %v", cc)
30 30
 	err := cc.Download()
31 31
 	if err != nil {
32 32
 		panic(err)
@@ -35,11 +35,14 @@ func mainCidrr() {
35 35
 	if cc.IsDownloaded() {
36 36
 		conn := pool.Get()
37 37
 		defer conn.Close()
38
-		err := cc.Import(conn)
38
+		log.Printf("IMPORT %v", cc)
39
+		start := time.Now()
40
+		n, err := cc.Import(conn)
39 41
 		if err != nil {
40 42
 			panic(err)
41 43
 		}
42
-
44
+		log.Printf("Imported %d rows from %s in %s",
45
+			n, filepath.Base(cc.Path()), time.Since(start))
43 46
 	}
44 47
 }
45 48
 
@@ -58,13 +61,25 @@ func DoLookup() {
58 61
 	q := internet.NewIP2ASNClient(conn)
59 62
 	q2 := internet.NewASN2ASDescClient(conn)
60 63
 	for _, i := range []string{"8.8.8.8", "5.150.255.150", "127.0.0.1"} {
61
-		res := q.Current(i)
64
+		res, err := q.Current(i)
65
+		if err != nil {
66
+			panic(err)
67
+		}
62 68
 		log.Printf("current   : %s: %s ", i, res)
63 69
 		if res != nil {
64
-			log.Printf("desc      : %s", q2.Current(res.ASN))
70
+			cur, err := q2.Current(res.ASN)
71
+			if err != nil {
72
+				panic(err)
73
+			}
74
+			log.Printf("desc      : %s", cur)
65 75
 		}
66 76
 		log.Printf("allhistory: %s", i)
67
-		for _, v := range q.AllHistory(i) {
77
+		allhist, err := q.AllHistory(i)
78
+		if err != nil {
79
+			panic(err)
80
+		}
81
+
82
+		for _, v := range allhist {
68 83
 			log.Println(v.String())
69 84
 		}
70 85
 	}
@@ -91,20 +106,23 @@ func DoIndex() {
91 106
 					log.Println("Recovered in f", r, b.Path())
92 107
 				}
93 108
 			}()
94
-			// log.Printf("DOWNLOAD %s", b.Path())
109
+			log.Printf("DOWNLOAD %s", b.Path())
95 110
 			err := b.Download()
96 111
 			if err != nil {
97 112
 				panic(err)
98 113
 			}
99 114
 			if b.IsDownloaded() {
100
-				// log.Printf("IMPORT %s", b.Path())
115
+				log.Printf("IMPORT %s", b.Path())
101 116
 				conn := pool.Get()
102 117
 				defer conn.Close()
103
-				err := b.Import(conn)
104
-
118
+				start := time.Now()
119
+				n, err := b.Import(conn)
105 120
 				if err != nil {
106 121
 					panic(err)
107 122
 				}
123
+				log.Printf("Imported %d rows from %s in %s",
124
+					n, filepath.Base(b.Path()), time.Since(start))
125
+
108 126
 			}
109 127
 		}(b)
110 128
 	}

+ 12
- 0
internet.go Переглянути файл

@@ -37,10 +37,22 @@ https://github.com/CIRCL/ASN-Description-History.
37 37
 package internet
38 38
 
39 39
 import (
40
+	"fmt"
40 41
 	"os"
41 42
 	"path/filepath"
42 43
 )
43 44
 
45
+type ParseError struct {
46
+	Message string
47
+	Path    string
48
+	LineNum int
49
+	Line    string
50
+}
51
+
52
+func (p ParseError) Error() string {
53
+	return fmt.Sprintf("parse error: %s in %s:%d: %s", p.Message, p.Path, p.LineNum, p.Line)
54
+}
55
+
44 56
 var dataDir = filepath.Join(os.TempDir(), "internet")
45 57
 
46 58
 // SetDataDir sets the storage directory for downloads cache and temporary files.

+ 19
- 16
ip2asn.go Переглянути файл

@@ -32,41 +32,44 @@ type IP2ASNClient struct {
32 32
 }
33 33
 
34 34
 // Current returns the latest known result for an IP2ASN lookup.
35
-func (i *IP2ASNClient) Current(IP string) *ASNResult {
35
+func (i *IP2ASNClient) Current(IP string) (*ASNResult, error) {
36 36
 	ip, err := i.parseIP(IP)
37 37
 	if err != nil {
38
-		return &ASNResult{}
38
+		return &ASNResult{}, err
39 39
 	}
40 40
 	var current string
41 41
 	allDates, err := i.importedDates()
42 42
 	if err != nil {
43
-		return &ASNResult{}
43
+		return &ASNResult{}, err
44 44
 	}
45 45
 
46 46
 	if len(allDates) < 0 {
47
-		return &ASNResult{}
47
+		return &ASNResult{}, err
48 48
 	}
49 49
 	current = allDates[len(allDates)-1]
50 50
 
51
-	results := i.dates(ip, []string{current})
51
+	results, err := i.dates(ip, []string{current})
52
+	if err != nil {
53
+		return nil, err
54
+	}
55
+
52 56
 	if len(results) > 0 {
53
-		return &results[0]
57
+		return &results[0], nil
54 58
 	}
55
-	return nil
59
+	return nil, nil
56 60
 }
57 61
 
58 62
 // AllHistory returns the full history for the given IP address.
59
-func (i *IP2ASNClient) AllHistory(IP string) []ASNResult {
63
+func (i *IP2ASNClient) AllHistory(IP string) ([]ASNResult, error) {
60 64
 	ip, err := i.parseIP(IP)
61 65
 	if err != nil {
62
-		return []ASNResult{}
66
+		return []ASNResult{}, err
63 67
 	}
64 68
 	dates, err := i.importedDates()
65 69
 	if err != nil {
66
-		panic(err)
70
+		return []ASNResult{}, err
67 71
 	}
68
-	result := i.dates(ip, dates)
69
-	return result
72
+	return i.dates(ip, dates)
70 73
 }
71 74
 
72 75
 func (i *IP2ASNClient) parseIP(IP string) (net.IP, error) {
@@ -103,7 +106,7 @@ func (i *IP2ASNClient) importedDates() ([]string, error) {
103 106
 }
104 107
 
105 108
 // dates resolves IP2ASN for all date entries, if available.
106
-func (i *IP2ASNClient) dates(IP net.IP, dates []string) []ASNResult {
109
+func (i *IP2ASNClient) dates(IP net.IP, dates []string) ([]ASNResult, error) {
107 110
 	keys := i.keys(IP)
108 111
 	for _, d := range dates {
109 112
 		for _, k := range keys {
@@ -123,14 +126,14 @@ func (i *IP2ASNClient) dates(IP net.IP, dates []string) []ASNResult {
123 126
 					if err == redis.ErrNil {
124 127
 						continue
125 128
 					}
126
-					panic(err)
129
+					return []ASNResult{}, err
127 130
 				}
128 131
 
129 132
 				timedate, err := time.Parse("20060102", date)
130 133
 				asn, err := strconv.Atoi(r)
131 134
 				if err != nil {
132 135
 					// redis data error
133
-					panic(err)
136
+					return []ASNResult{}, err
134 137
 				}
135 138
 				results = append(results, ASNResult{
136 139
 					Mask: keys[idx],
@@ -144,7 +147,7 @@ func (i *IP2ASNClient) dates(IP net.IP, dates []string) []ASNResult {
144 147
 		}
145 148
 
146 149
 	}
147
-	return results
150
+	return results, nil
148 151
 }
149 152
 
150 153
 // RIPE-NCC-RIS BGP IPv6 Anchor Prefix @RRC00