Skip to content

Commit decee36

Browse files
authored
Merge pull request #7 from bubble-diff/dev
修复一些bug并支持汇报部署信息
2 parents b839abe + 0ef9072 commit decee36

File tree

3 files changed

+119
-6
lines changed

3 files changed

+119
-6
lines changed

config.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
package main
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"fmt"
7+
"net/http"
68
"os"
9+
"sync"
10+
"time"
711

812
"github.com/sirupsen/logrus"
913
)
@@ -22,6 +26,9 @@ type config struct {
2226

2327
// DeviceIPv4 网卡ipv4地址
2428
DeviceIPv4 string
29+
30+
mu sync.Mutex
31+
isTaskRunning bool
2532
}
2633

2734
var configuration = config{}
@@ -47,8 +54,95 @@ func (c *config) init() {
4754
logrus.Fatalf("%s: this device has no ipv4 address.", c.Device)
4855
}
4956

57+
// 启动后台去执行一些自动更新的动作
58+
go func() {
59+
logrus.Info("configuration background started.")
60+
for {
61+
c.setIsTaskRunning()
62+
c.reportDepolyed()
63+
time.Sleep(5 * time.Second)
64+
}
65+
}()
66+
5067
logrus.WithField(
5168
"configuration",
5269
fmt.Sprintf("%+v", configuration),
5370
).Debug("configuration initialized.")
5471
}
72+
73+
func (c *config) getIsTaskRunning() bool {
74+
c.mu.Lock()
75+
defer c.mu.Unlock()
76+
return c.isTaskRunning
77+
}
78+
79+
func (c *config) setIsTaskRunning() {
80+
var err error
81+
var apiResp struct {
82+
Err string `json:"err"`
83+
IsRunning bool `json:"is_running"`
84+
}
85+
86+
logrus.Infof("[setIsTaskRunning] updating TaskID=%d status...", c.Taskid)
87+
// call bubblereplay
88+
api := fmt.Sprintf("http://%s%s/%d", c.ReplaySvrAddr, ApiTaskStatus, c.Taskid)
89+
resp, err := http.Get(api)
90+
if err != nil {
91+
logrus.Errorf("[setIsTaskRunning] call api failed, %s", err)
92+
return
93+
}
94+
95+
err = json.NewDecoder(resp.Body).Decode(&apiResp)
96+
if err != nil {
97+
logrus.Errorf("[setIsTaskRunning] decode json response failed, %s", err)
98+
return
99+
}
100+
resp.Body.Close()
101+
102+
if apiResp.Err != "" {
103+
logrus.Errorf("[setIsTaskRunning] response return error, %s", err)
104+
return
105+
}
106+
107+
// update state
108+
c.mu.Lock()
109+
defer c.mu.Unlock()
110+
c.isTaskRunning = apiResp.IsRunning
111+
}
112+
113+
func (c *config) reportDepolyed() {
114+
var err error
115+
var apiBody = struct {
116+
TaskID int64 `json:"task_id"`
117+
// Addr 基准服务地址
118+
Addr string `json:"addr"`
119+
}{
120+
TaskID: c.Taskid,
121+
Addr: c.DeviceIPv4,
122+
}
123+
var apiResp struct {
124+
Err string `json:"err"`
125+
}
126+
127+
logrus.Infof("[reportDepolyed] reporting TaskID=%d has been deployed the bubblecopy...", c.Taskid)
128+
// call bubblereplay
129+
api := fmt.Sprintf("http://%s%s", c.ReplaySvrAddr, ApiSetDeployed)
130+
data, err := json.Marshal(apiBody)
131+
resp, err := http.Post(api, "application/json", bytes.NewReader(data))
132+
if err != nil {
133+
logrus.Errorf("[reportDepolyed] call api failed, %s", err)
134+
return
135+
}
136+
137+
err = json.NewDecoder(resp.Body).Decode(&apiResp)
138+
if err != nil {
139+
logrus.Errorf("[reportDepolyed] decode json response failed, %s", err)
140+
return
141+
}
142+
resp.Body.Close()
143+
144+
if apiResp.Err != "" {
145+
logrus.Errorf("[reportDepolyed] response return error, %s", err)
146+
return
147+
}
148+
}

const.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,6 @@ const (
1414

1515
const (
1616
ApiAddRecord = "/record/add"
17+
ApiTaskStatus = "/task_status"
18+
ApiSetDeployed = "/deploy"
1719
)

main.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,26 +69,43 @@ func main() {
6969

7070
ticker := time.NewTicker(time.Second * 30)
7171
for {
72-
// todo: 等待Diff任务启动,若未启动,请勿进行抓包消耗CPU
73-
// your code here...
74-
72+
skip := false
7573
done := false
74+
75+
// 等待Diff任务启动,若未启动,请勿进行抓包消耗CPU
76+
if configuration.getIsTaskRunning() == false {
77+
logrus.Infof("TaskID=%d is not running, self sleeping...", configuration.Taskid)
78+
skip = true
79+
}
80+
7681
select {
7782
case <-signalChan:
7883
logrus.Info("Caught SIGINT: aborting")
7984
done = true
8085
case <-ticker.C:
8186
// 停止监听30秒内无数据传输的连接
8287
assembler.FlushCloseOlderThan(time.Now().Add(time.Second * -30))
88+
default:
89+
// nop
90+
}
91+
92+
if done {
93+
break
94+
}
95+
if skip {
96+
time.Sleep(time.Second)
97+
continue
98+
}
99+
100+
select {
83101
case packet := <-source.Packets():
84102
tcp := packet.Layer(layers.LayerTypeTCP)
85103
if tcp != nil {
86104
tcp := tcp.(*layers.TCP)
87105
assembler.Assemble(packet.NetworkLayer().NetworkFlow(), tcp)
88106
}
89-
}
90-
if done {
91-
break
107+
default:
108+
// nop
92109
}
93110
}
94111

0 commit comments

Comments
 (0)