diff options
Diffstat (limited to 'src/dma/cmd/threshold')
-rw-r--r-- | src/dma/cmd/threshold/evaluate.go | 37 | ||||
-rw-r--r-- | src/dma/cmd/threshold/main.go | 107 | ||||
-rw-r--r-- | src/dma/cmd/threshold/read.go | 82 | ||||
-rw-r--r-- | src/dma/cmd/threshold/transmit.go | 97 |
4 files changed, 323 insertions, 0 deletions
diff --git a/src/dma/cmd/threshold/evaluate.go b/src/dma/cmd/threshold/evaluate.go new file mode 100644 index 00000000..8c961253 --- /dev/null +++ b/src/dma/cmd/threshold/evaluate.go @@ -0,0 +1,37 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +func evaluate(config *Config, rdlist []rawData) []evalData { + edlist := []evalData{} + + for _, rd := range rdlist { + maxVal := 0 + for _, val := range rd.datalist { + if maxVal < val { + maxVal = val + } + } + + if maxVal > config.Threshold.Min { + edlist = append(edlist, evalData{rd.key, 1}) + } else { + edlist = append(edlist, evalData{rd.key, 0}) + } + } + return edlist +} diff --git a/src/dma/cmd/threshold/main.go b/src/dma/cmd/threshold/main.go new file mode 100644 index 00000000..b98328d1 --- /dev/null +++ b/src/dma/cmd/threshold/main.go @@ -0,0 +1,107 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "github.com/BurntSushi/toml" + "log" + "sync" + "time" +) + +// Config is ... +type Config struct { + Common CommonConfig + Threshold ThresholdConfig +} + +// CommonConfig is ... +type CommonConfig struct { + RedisHost string `toml:"redis_host"` + RedisPort string `toml:"redis_port"` + RedisPassword string `toml:"redis_password"` + RedisDB int `toml:"redis_db"` +} + +// ThresholdConfig is ... +type ThresholdConfig struct { + RedisHost string `toml:"redis_host"` + RedisPort string `toml:"redis_port"` + RedisPassword string `toml:"redis_password"` + RedisDB int `toml:"redis_db"` + + Interval int `toml:"interval"` + Min int `toml:"min"` + + CollectdPlugin string `toml:"collectd_plugin"` + CollectdType string `toml:"collectd_type"` +} + +type rawData struct { + key string + datalist []int +} + +type evalData struct { + key string + label int +} + +func main() { + var config Config + _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config) + if err != nil { + log.Fatalf("Read error of config: %s", err) + } + + thresConfig := config.Threshold + log.Printf("Raw data redis config Addr:%s:%s DB:%d", thresConfig.RedisHost, thresConfig.RedisPort, thresConfig.RedisDB) + if thresConfig.RedisPassword == "" { + log.Printf("Raw data redis password is not set") + } + annoConfig := config.Common + log.Printf("Annotate redis config Addr:%s:%s DB:%d", annoConfig.RedisHost, annoConfig.RedisPort, annoConfig.RedisDB) + if annoConfig.RedisPassword == "" { + log.Printf("Annotate redis password is not set") + } + + var waitgroup sync.WaitGroup + ctx := context.Background() + + waitgroup.Add(1) + go func() { + defer waitgroup.Done() + ticker := time.NewTicker(time.Duration(config.Threshold.Interval) * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + result1 := read(&config) + // analysis() + result2 := evaluate(&config, result1) + transmit(&config, result2) + } + } + }() + + waitgroup.Wait() + + fmt.Println("End") +} diff --git a/src/dma/cmd/threshold/read.go b/src/dma/cmd/threshold/read.go new file mode 100644 index 00000000..0b0cf5a1 --- /dev/null +++ b/src/dma/cmd/threshold/read.go @@ -0,0 +1,82 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "github.com/go-redis/redis" + "os" + "strconv" + "strings" + "time" +) + +// e.g. collectd/instance-00000001/virt/if_octets-tapd21acb51-35 +const redisKey = "collectd/*/virt/if_octets-*" + +func zrangebyscore(config *Config, client *redis.Client, key string) []int { + + unixNow := int(time.Now().Unix()) + + val, err := client.ZRangeByScore(key, redis.ZRangeBy{ + Min: strconv.Itoa(unixNow - config.Threshold.Interval), + Max: strconv.Itoa(unixNow), + }).Result() + + datalist := []int{} + + if err == redis.Nil { + fmt.Println("this key is not exist") + os.Exit(1) + } else if err != nil { + panic(err) + } else { + for _, strVal := range val { + split := strings.Split(strVal, ":") + txVal := split[2] + floatVal, err := strconv.ParseFloat(txVal, 64) + if err != nil { + os.Exit(1) + } + datalist = append(datalist, int(floatVal)) + } + } + return datalist +} + +func read(config *Config) []rawData { + thresConfig := config.Threshold + + client := redis.NewClient(&redis.Options{ + Addr: thresConfig.RedisHost + ":" + thresConfig.RedisPort, + Password: thresConfig.RedisPassword, + DB: thresConfig.RedisDB, + }) + + keys, err := client.Keys(redisKey).Result() + if err != nil { + panic(err) + } + + rdlist := []rawData{} + + for _, key := range keys { + rdlist = append(rdlist, rawData{key, zrangebyscore(config, client, key)}) + } + + return rdlist +} diff --git a/src/dma/cmd/threshold/transmit.go b/src/dma/cmd/threshold/transmit.go new file mode 100644 index 00000000..8cac2a88 --- /dev/null +++ b/src/dma/cmd/threshold/transmit.go @@ -0,0 +1,97 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "fmt" + "github.com/distributed-monitoring/agent/pkg/common" + "github.com/go-redis/redis" + "strconv" + "strings" + "time" +) + +type collectdNotifier struct { + pluginName string + typeName string +} + +func send(cn collectdNotifier, message string, severity string, metaData [][2]string) error { + unixNow := float64(time.Now().UnixNano()) / 1000000000 + + var metaDataStr bytes.Buffer + for _, data := range metaData { + metaDataStr.WriteString(" s:") + metaDataStr.WriteString(data[0]) + metaDataStr.WriteString("=\"") + metaDataStr.WriteString(strings.Replace(data[1], "\"", "\\\"", -1)) + metaDataStr.WriteString("\"") + } + + fmt.Printf("PUTNOTIF message=\"%s\" severity=%s time=%f "+ + "host=localhost plugin=%s type=%s %s\n", + message, severity, unixNow, cn.pluginName, cn.typeName, metaDataStr.String()) + + return nil +} + +func transmit(config *Config, edlist []evalData) { + annoConfig := config.Common + + client := redis.NewClient(&redis.Options{ + Addr: annoConfig.RedisHost + ":" + annoConfig.RedisPort, + Password: annoConfig.RedisPassword, + DB: annoConfig.RedisDB, + }) + pool := common.RedisPool{Client: client} + + notifier := collectdNotifier{ + pluginName: config.Threshold.CollectdPlugin, + typeName: config.Threshold.CollectdType} + + for _, ed := range edlist { + if ed.label == 1 { + + fmt.Println("kick action") + + item := strings.Split(ed.key, "/") + ifItem := strings.SplitN(item[3], "-", 2) + virtName := item[1] + virtIF := ifItem[1] + + var message bytes.Buffer + message.WriteString("Value exceeded threshold ") + message.WriteString(strconv.Itoa(config.Threshold.Min)) + message.WriteString(".") + + nameVal, _ := pool.Get(fmt.Sprintf("%s/%s/vminfo", "vm", virtName)) + ifVal, _ := pool.Get(fmt.Sprintf("%s/%s/neutron_network", "if", virtIF)) + + nameInfo := fmt.Sprintf("{\"%s\": %s}", virtName, nameVal) + ifInfo := fmt.Sprintf("{\"%s\": %s}", virtIF, ifVal) + + fmt.Println(nameInfo) + fmt.Println(ifInfo) + + send(notifier, message.String(), + "warning", + [][2]string{{"vminfo", nameInfo}, {"neutron_network", ifInfo}}) + + } + } +} |