summaryrefslogtreecommitdiffstats
path: root/src/dma/cmd
diff options
context:
space:
mode:
authorToshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>2018-09-06 09:04:29 +0000
committerToshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>2018-09-07 06:03:01 +0000
commitd61931341176dad9ccff7c967a10d88fe54218fa (patch)
tree526457882d4abe0c38d2242d6daa311bf8ef51cf /src/dma/cmd
parent73abc060f31a6bf866fa1dad0a1a6efdfd94d775 (diff)
src: Add DMA localagent
Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa Signed-off-by: Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>
Diffstat (limited to 'src/dma/cmd')
-rw-r--r--src/dma/cmd/infofetch/daemon.go103
-rw-r--r--src/dma/cmd/infofetch/openstack.go522
-rw-r--r--src/dma/cmd/infofetch/virsh_domain.go264
-rw-r--r--src/dma/cmd/server/agent.go31
-rw-r--r--src/dma/cmd/server/amqp.go108
-rw-r--r--src/dma/cmd/server/api.go85
-rw-r--r--src/dma/cmd/server/main.go85
-rw-r--r--src/dma/cmd/threshold/evaluate.go37
-rw-r--r--src/dma/cmd/threshold/main.go107
-rw-r--r--src/dma/cmd/threshold/read.go82
-rw-r--r--src/dma/cmd/threshold/transmit.go97
11 files changed, 1521 insertions, 0 deletions
diff --git a/src/dma/cmd/infofetch/daemon.go b/src/dma/cmd/infofetch/daemon.go
new file mode 100644
index 00000000..d4ff94f5
--- /dev/null
+++ b/src/dma/cmd/infofetch/daemon.go
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "github.com/BurntSushi/toml"
+ "github.com/distributed-monitoring/agent/pkg/common"
+ "github.com/go-redis/redis"
+ libvirt "github.com/libvirt/libvirt-go"
+ "log"
+ "sync"
+)
+
+var infoPool common.RedisPool
+
+// Config is ...
+type Config struct {
+ Common CommonConfig
+ InfoFetch InfoFetchConfig
+}
+
+// CommonConfig is ...
+type CommonConfig struct {
+ RedisHost string `toml:"redis_host"`
+ RedisPort string `toml:"redis_port"`
+ RedisPassword string `toml:"redis_password"`
+ RedisDB int `toml:"redis_db"`
+}
+
+// InfoFetchConfig is ...
+type InfoFetchConfig struct {
+ OSUsername string `toml:"os_username"`
+ OSUserDomainName string `toml:"os_user_domain_name"`
+ OSProjectDomainName string `toml:"os_project_domain_name"`
+ OSProjectName string `toml:"os_project_name"`
+ OSPassword string `toml:"os_password"`
+ OSAuthURL string `toml:"os_auth_url"`
+}
+
+func main() {
+
+ var config Config
+ _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config)
+ if err != nil {
+ log.Println("Read error of config file")
+ }
+
+ var waitgroup sync.WaitGroup
+ libvirt.EventRegisterDefaultImpl()
+
+ redisClient := redis.NewClient(&redis.Options{
+ Addr: config.Common.RedisHost + ":" + config.Common.RedisPort,
+ Password: config.Common.RedisPassword,
+ DB: config.Common.RedisDB,
+ })
+ infoPool = common.RedisPool{Client: redisClient}
+ // Initialize redis db...
+ infoPool.DelAll()
+
+ conn, err := libvirt.NewConnect("qemu:///system")
+ if err != nil {
+ log.Fatalln("libvirt connect error")
+ }
+ defer conn.Close()
+
+ vmIfInfoChan := make(chan string)
+ {
+ ctx := context.Background()
+ waitgroup.Add(1)
+ go func() {
+ RunNeutronInfoFetch(ctx, &config, vmIfInfoChan)
+ waitgroup.Done()
+ }()
+ }
+
+ //Get active VM info
+ GetActiveDomain(conn, vmIfInfoChan)
+ {
+ ctx := context.Background()
+ waitgroup.Add(1)
+ go func() {
+ RunVirshEventLoop(ctx, conn, vmIfInfoChan)
+ waitgroup.Done()
+ }()
+ }
+
+ waitgroup.Wait()
+}
diff --git a/src/dma/cmd/infofetch/openstack.go b/src/dma/cmd/infofetch/openstack.go
new file mode 100644
index 00000000..c0c54f5a
--- /dev/null
+++ b/src/dma/cmd/infofetch/openstack.go
@@ -0,0 +1,522 @@
+/*
+ * Copyright 2017 Red Hat
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/url"
+ "os"
+ "strings"
+ "text/template"
+ "time"
+)
+
+var env *InfoFetchConfig
+
+type userInfo struct {
+ UserDomainName string
+ UserName string
+ Password string
+ ProjectDomainName string
+ ProjectName string
+}
+
+var tokenJSONTemplate = `{
+ "auth": {
+ "identity": {
+ "methods": [
+ "password"
+ ],
+ "password": {
+ "user": {
+ "domain": {
+ "name": "{{.UserDomainName}}"
+ },
+ "name": "{{.UserName}}",
+ "password": "{{.Password}}"
+ }
+ }
+ },
+ "scope": {
+ "project": {
+ "domain": {
+ "name": "{{.ProjectDomainName}}"
+ },
+ "name": "{{.ProjectName}}"
+ }
+ }
+ }
+}
+`
+
+type tokenReply struct {
+ Token struct {
+ IsDomain bool `json:"is_domain"`
+ Methods []string `json:"methods"`
+ Roles []struct {
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"roles"`
+ ExpiresAt time.Time `json:"expires_at"`
+ Project struct {
+ Domain struct {
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"domain"`
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"project"`
+ User struct {
+ PasswordExpiresAt interface{} `json:"password_expires_at"`
+ Domain struct {
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"domain"`
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"user"`
+ AuditIds []string `json:"audit_ids"`
+ IssuedAt time.Time `json:"issued_at"`
+ } `json:"token"`
+}
+
+type token struct {
+ Token string
+ ExpiresAt time.Time
+}
+
+func (t *token) CheckToken() {
+ now := time.Now()
+
+ if t.ExpiresAt.Sub(now).Seconds() < 30 {
+ newToken, _ := getToken()
+ t.Token = newToken.Token
+ t.ExpiresAt = newToken.ExpiresAt
+ }
+}
+
+func getToken() (*token, error) {
+ var buf bytes.Buffer
+
+ t := template.Must(template.New("json template1").Parse(tokenJSONTemplate))
+ p := userInfo{
+ UserDomainName: env.OSUserDomainName,
+ UserName: env.OSUsername,
+ Password: env.OSPassword,
+ ProjectDomainName: env.OSProjectDomainName,
+ ProjectName: env.OSProjectName,
+ }
+ t.Execute(&buf, p)
+
+ body := bytes.NewReader(buf.Bytes())
+ req, err := http.NewRequest("POST", env.OSAuthURL+"/auth/tokens?nocatalog", body)
+ if err != nil {
+ return &token{"", time.Unix(0, 0)}, fmt.Errorf("http request failed: %v", err)
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return &token{"", time.Unix(0, 0)}, fmt.Errorf("http POST failed: %v", err)
+ }
+ defer resp.Body.Close()
+ b, err := ioutil.ReadAll(resp.Body)
+
+ tokenStr, ok := resp.Header["X-Subject-Token"]
+ if ok != true && len(tokenStr) != 1 {
+ return &token{"", time.Unix(0, 0)}, fmt.Errorf("no token in openstack reply")
+ }
+
+ var repl tokenReply
+ err = json.Unmarshal(b, &repl)
+
+ return &token{tokenStr[0], repl.Token.ExpiresAt}, nil
+}
+
+type service struct {
+ Description string `json:"description"`
+ Links struct {
+ Self string `json:"self"`
+ } `json:"links"`
+ Enabled bool `json:"enabled"`
+ Type string `json:"type"`
+ ID string `json:"id"`
+ Name string `json:"name"`
+}
+
+type serviceListReply struct {
+ Services []service `json:"services"`
+}
+
+func (s *serviceListReply) GetService(name string) (*service, error) {
+ for _, v := range s.Services {
+ if v.Name == name {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no service id (%s) found", name)
+}
+
+type endPoint struct {
+ RegionID string `json:"region_id"`
+ Links struct {
+ Self string `json:"self"`
+ } `json:"links"`
+ URL string `json:"url"`
+ Region string `json:"region"`
+ Enabled bool `json:"enabled"`
+ Interface string `json:"interface"`
+ ServiceID string `json:"service_id"`
+ ID string `json:"id"`
+}
+
+type endPointReply struct {
+ Endpoints []endPoint `json:"endpoints"`
+}
+
+func (e *endPointReply) GetEndpoint(serviceid string, ifname string) (*endPoint, error) {
+ for _, v := range e.Endpoints {
+ if v.Interface == ifname && v.ServiceID == serviceid {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no endpoint found (%s/%s)", serviceid, ifname)
+}
+
+func getEndpoints(token *token) (endPointReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", env.OSAuthURL+"/endpoints", nil)
+ if err != nil {
+ return endPointReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return endPointReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+ //fmt.Printf("%s", string(b))
+
+ var repl endPointReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return endPointReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ //fmt.Printf("%v", repl)
+ return repl, nil
+}
+
+func getServiceList(token *token) (serviceListReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", env.OSAuthURL+"/services", nil)
+ if err != nil {
+ return serviceListReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return serviceListReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl serviceListReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return serviceListReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ return repl, nil
+}
+
+type neutronPort struct {
+ AllowedAddressPairs []interface{} `json:"allowed_address_pairs"`
+ ExtraDhcpOpts []interface{} `json:"extra_dhcp_opts"`
+ UpdatedAt time.Time `json:"updated_at"`
+ DeviceOwner string `json:"device_owner"`
+ RevisionNumber int `json:"revision_number"`
+ PortSecurityEnabled bool `json:"port_security_enabled"`
+ BindingProfile struct {
+ } `json:"binding:profile"`
+ FixedIps []struct {
+ SubnetID string `json:"subnet_id"`
+ IPAddress string `json:"ip_address"`
+ } `json:"fixed_ips"`
+ ID string `json:"id"`
+ SecurityGroups []interface{} `json:"security_groups"`
+ BindingVifDetails struct {
+ PortFilter bool `json:"port_filter"`
+ DatapathType string `json:"datapath_type"`
+ OvsHybridPlug bool `json:"ovs_hybrid_plug"`
+ } `json:"binding:vif_details"`
+ BindingVifType string `json:"binding:vif_type"`
+ MacAddress string `json:"mac_address"`
+ ProjectID string `json:"project_id"`
+ Status string `json:"status"`
+ BindingHostID string `json:"binding:host_id"`
+ Description string `json:"description"`
+ Tags []interface{} `json:"tags"`
+ QosPolicyID interface{} `json:"qos_policy_id"`
+ Name string `json:"name"`
+ AdminStateUp bool `json:"admin_state_up"`
+ NetworkID string `json:"network_id"`
+ TenantID string `json:"tenant_id"`
+ CreatedAt time.Time `json:"created_at"`
+ BindingVnicType string `json:"binding:vnic_type"`
+ DeviceID string `json:"device_id"`
+}
+
+type neutronPortReply struct {
+ Ports []neutronPort `json:"ports"`
+}
+
+func getNeutronPorts(token *token, endpoint string) (neutronPortReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", endpoint+"/v2.0/ports", nil)
+ if err != nil {
+ return neutronPortReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return neutronPortReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl neutronPortReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return neutronPortReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ return repl, nil
+}
+
+func (n *neutronPortReply) GetNeutronPortfromMAC(mac string) (*neutronPort,
+ error) {
+ for _, v := range n.Ports {
+ if v.MacAddress == strings.ToLower(mac) {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no port (%s) found", mac)
+}
+
+type neutronNetwork struct {
+ ProviderPhysicalNetwork string `json:"provider:physical_network"`
+ Ipv6AddressScope interface{} `json:"ipv6_address_scope"`
+ RevisionNumber int `json:"revision_number"`
+ PortSecurityEnabled bool `json:"port_security_enabled"`
+ Mtu int `json:"mtu"`
+ ID string `json:"id"`
+ RouterExternal bool `json:"router:external"`
+ AvailabilityZoneHints []interface{} `json:"availability_zone_hints"`
+ AvailabilityZones []string `json:"availability_zones"`
+ ProviderSegmentationID interface{} `json:"provider:segmentation_id"`
+ Ipv4AddressScope interface{} `json:"ipv4_address_scope"`
+ Shared bool `json:"shared"`
+ ProjectID string `json:"project_id"`
+ Status string `json:"status"`
+ Subnets []string `json:"subnets"`
+ Description string `json:"description"`
+ Tags []interface{} `json:"tags"`
+ UpdatedAt time.Time `json:"updated_at"`
+ IsDefault bool `json:"is_default"`
+ QosPolicyID interface{} `json:"qos_policy_id"`
+ Name string `json:"name"`
+ AdminStateUp bool `json:"admin_state_up"`
+ TenantID string `json:"tenant_id"`
+ CreatedAt time.Time `json:"created_at"`
+ ProviderNetworkType string `json:"provider:network_type"`
+}
+
+type neutronNetworkReply struct {
+ Networks []neutronNetwork `json:"networks"`
+}
+
+func (n *neutronNetworkReply) GetNetworkFromID(netid string) (*neutronNetwork, error) {
+ for _, v := range n.Networks {
+ if v.ID == netid {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no network (%s) found", netid)
+}
+
+func getNetworkReply(token *token, endpoint string) (neutronNetworkReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", endpoint+"/v2.0/networks", nil)
+ if err != nil {
+ return neutronNetworkReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return neutronNetworkReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl neutronNetworkReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return neutronNetworkReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ return repl, nil
+}
+
+type novaCompute struct {
+ ID string `json:"id"`
+ Links []struct {
+ Href string `json:"href"`
+ Rel string `json:"rel"`
+ } `json:"links"`
+ Name string `json:"name"`
+}
+
+type novaComputeReply struct {
+ Servers []novaCompute `json:"servers"`
+}
+
+func (n *novaComputeReply) GetComputeFromID(vmid string) (*novaCompute, error) {
+ for _, v := range n.Servers {
+ if v.ID == vmid {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no vm (%s) found", vmid)
+}
+
+func getComputeReply(token *token, endpoint string) (novaComputeReply, error) {
+ token.CheckToken()
+ values := url.Values{}
+ values.Add("all_tenants", "1")
+
+ req, err := http.NewRequest("GET", endpoint+"/servers", nil)
+ if err != nil {
+ return novaComputeReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+ req.URL.RawQuery = values.Encode()
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return novaComputeReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl novaComputeReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return novaComputeReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+
+ return repl, nil
+}
+
+type osNeutronInterfaceAnnotation struct {
+ IfName string
+ VMName string
+ NetworkName string
+}
+
+// RunNeutronInfoFetch gets redis key update from libvirt and get network information
+// from Neutron with REST api. The retrieved information is stored under redis,
+// if/<tap name>/neutron_network
+func RunNeutronInfoFetch(ctx context.Context, config *Config, vmIfInfo chan string) error {
+ env = &config.InfoFetch
+ token, err := getToken()
+
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "cannot get token: %v", err)
+ return err
+ }
+
+ svc, _ := getServiceList(token)
+ neuID, _ := svc.GetService("neutron")
+ //fmt.Printf("neutron id:%s\n", id.ID)
+
+ novaID, _ := svc.GetService("nova")
+ //fmt.Printf("nova id:%s\n", id.ID)
+
+ endpoints, _ := getEndpoints(token)
+ neuEndpoint, _ := endpoints.GetEndpoint(neuID.ID, "admin")
+ //fmt.Printf("neutron endpoint:%s\n", neuEndpoint.URL)
+
+ novaEndpoint, _ := endpoints.GetEndpoint(novaID.ID, "admin")
+ //fmt.Printf("nova endpoint:%s\n", novaEndpoint.URL)
+
+ getComputeReply(token, novaEndpoint.URL)
+ getNeutronPorts(token, neuEndpoint.URL)
+ //vmrepl, _ := getComputeReply(token, novaEndpoint.URL)
+ //prepl, _ := getNeutronPorts(token, neuEndpoint.URL)
+
+EVENTLOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ break EVENTLOOP
+ case key := <-vmIfInfo:
+ log.Printf("Incoming IF: %v", key)
+ libvirtIfInfo, err := infoPool.Get(key)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ var ifInfo osVMInterfaceAnnotation
+ err = json.Unmarshal([]byte(libvirtIfInfo), &ifInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ vmrepl, _ := getComputeReply(token, novaEndpoint.URL)
+ prepl, _ := getNeutronPorts(token, neuEndpoint.URL)
+ nrepl, _ := getNetworkReply(token, neuEndpoint.URL)
+ netid, _ := prepl.GetNeutronPortfromMAC(ifInfo.MacAddr)
+ net, _ := nrepl.GetNetworkFromID(netid.NetworkID)
+ vm, _ := vmrepl.GetComputeFromID(netid.DeviceID)
+ osIfInfo := osNeutronInterfaceAnnotation{
+ IfName: ifInfo.Target,
+ VMName: vm.Name,
+ NetworkName: net.Name}
+
+ osIfInfoJSON, err := json.Marshal(osIfInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ log.Printf("Get: vmname: %s / networkname:%s", vm.Name, net.Name)
+ infoPool.Set(fmt.Sprintf("if/%s/%s", ifInfo.Target, "neutron_network"), string(osIfInfoJSON))
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/src/dma/cmd/infofetch/virsh_domain.go b/src/dma/cmd/infofetch/virsh_domain.go
new file mode 100644
index 00000000..b79f5bdd
--- /dev/null
+++ b/src/dma/cmd/infofetch/virsh_domain.go
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2017 Red Hat
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "encoding/xml"
+ "fmt"
+ libvirt "github.com/libvirt/libvirt-go"
+ "log"
+)
+
+type instance struct {
+ Name string `xml:"name"`
+ Owner struct {
+ User string `xml:"user"`
+ Project string `xml:"project"`
+ } `xml:"owner"`
+ Flavor struct {
+ Name string `xml:"name,attr"`
+ } `xml:"flavor"`
+}
+
+type domain struct {
+ Name string `xml:"name"`
+ Devices struct {
+ Interfaces []struct {
+ Type string `xml:"type,attr"`
+ Mac struct {
+ Address string `xml:"address,attr"`
+ } `xml:"mac"`
+ Target struct {
+ Dev string `xml:"dev,attr"`
+ } `xml:"target"`
+ } `xml:"interface"`
+ } `xml:"devices"`
+}
+
+type osVMAnnotation struct {
+ Name string
+ Owner string
+ Project string
+ Flavor string
+}
+
+type osVMInterfaceAnnotation struct {
+ Type string
+ MacAddr string
+ Target string
+ VMName string
+}
+
+func parseNovaMetadata(metadata string) (*osVMAnnotation, error) {
+ data := new(instance)
+
+ if err := xml.Unmarshal([]byte(metadata), data); err != nil {
+ log.Println("XML Unmarshal error:", err)
+ return nil, err
+ }
+ log.Printf("Get name: %s user: %s, project: %s, flavor: %s", data.Name, data.Owner.User, data.Owner.Project, data.Flavor.Name)
+ return &osVMAnnotation{
+ Name: data.Name,
+ Owner: data.Owner.User,
+ Project: data.Owner.Project,
+ Flavor: data.Flavor.Name}, nil
+}
+
+func parseXMLForMAC(dumpxml string) (*[]osVMInterfaceAnnotation, error) {
+ data := new(domain)
+
+ if err := xml.Unmarshal([]byte(dumpxml), data); err != nil {
+ log.Println("XML Unmarshal error:", err)
+ return nil, err
+ }
+
+ ifAnnotation := make([]osVMInterfaceAnnotation, len(data.Devices.Interfaces))
+ for i, v := range data.Devices.Interfaces {
+ log.Printf("Interface type: %s, mac_addr: %s, target_dev: %s", v.Type, v.Mac.Address, v.Target.Dev)
+ ifAnnotation[i] = osVMInterfaceAnnotation{
+ Type: v.Type,
+ MacAddr: v.Mac.Address,
+ Target: v.Target.Dev,
+ VMName: data.Name}
+ }
+ return &ifAnnotation, nil
+}
+
+func setInterfaceAnnotation(ifInfo *[]osVMInterfaceAnnotation, vmIfInfoChan chan string) {
+ for _, v := range *ifInfo {
+ ifInfoJSON, err := json.Marshal(v)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ infoPool.Set(fmt.Sprintf("if/%s/%s", v.Target, "network"), string(ifInfoJSON))
+
+ vmIfInfoChan <- fmt.Sprintf("if/%s/%s", v.Target, "network")
+ }
+ return
+}
+
+func domainEventLifecycleCallback(vmIfInfo chan string) func(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventLifecycle) {
+
+ return func(c *libvirt.Connect,
+ d *libvirt.Domain, event *libvirt.DomainEventLifecycle) {
+ domName, _ := d.GetName()
+
+ switch event.Event {
+ case libvirt.DOMAIN_EVENT_DEFINED:
+ // VM defined: vmname (libvirt, nova), user, project, flavor
+ // Redis: <vnname>/vminfo
+ log.Printf("Event defined: domName: %s, event: %v", domName, event)
+ metadata, err := d.GetMetadata(libvirt.DOMAIN_METADATA_ELEMENT, "http://openstack.org/xmlns/libvirt/nova/1.0", libvirt.DOMAIN_AFFECT_CONFIG)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ vmInfo, err := parseNovaMetadata(metadata)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ vmInfoJSON, err := json.Marshal(vmInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", domName, "vminfo"), string(vmInfoJSON))
+ case libvirt.DOMAIN_EVENT_STARTED:
+ // VM started: interface type, interface mac addr, intarface type
+ // Redis: <vnname>/interfaces
+ log.Printf("Event started: domName: %s, event: %v", domName, event)
+
+ xml, err := d.GetXMLDesc(0)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ ifInfo, err := parseXMLForMAC(xml)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ setInterfaceAnnotation(ifInfo, vmIfInfo)
+
+ ifInfoJSON, err := json.Marshal(ifInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", domName, "interfaces"), string(ifInfoJSON))
+ case libvirt.DOMAIN_EVENT_UNDEFINED:
+ log.Printf("Event undefined: domName: %s, event: %v", domName, event)
+ vmIFInfo, err := infoPool.Get(fmt.Sprintf("vm/%s/%s", domName, "interfaces"))
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ var interfaces []osVMInterfaceAnnotation
+ err = json.Unmarshal([]byte(vmIFInfo), &interfaces)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ for _, v := range interfaces {
+ infoPool.Del(fmt.Sprintf("if/%s/%s", v.Target, "network"))
+ infoPool.Del(fmt.Sprintf("if/%s/%s", v.Target, "neutron_network"))
+ }
+ }
+ }
+ infoPool.Del(fmt.Sprintf("vm/%s/%s", domName, "vminfo"))
+ infoPool.Del(fmt.Sprintf("vm/%s/%s", domName, "interfaces"))
+ default:
+ log.Printf("Event misc: domName: %s, event: %v", domName, event)
+ }
+ }
+}
+
+// GetActiveDomain gets all active domain information from libvirt and it should be called at startup to get
+// current running domain information
+func GetActiveDomain(conn *libvirt.Connect, vmIfInfoChan chan string) error {
+ doms, err := conn.ListAllDomains(libvirt.CONNECT_LIST_DOMAINS_ACTIVE)
+ if err != nil {
+ log.Fatalf("libvirt dom list error: %s", err)
+ return err
+ }
+
+ for _, d := range doms {
+ name, err := d.GetName()
+
+ // Get VM Info
+ metadata, err := d.GetMetadata(libvirt.DOMAIN_METADATA_ELEMENT, "http://openstack.org/xmlns/libvirt/nova/1.0", libvirt.DOMAIN_AFFECT_CONFIG)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ vmInfo, err := parseNovaMetadata(metadata)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ vmInfoJSON, err := json.Marshal(vmInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", name, "vminfo"), string(vmInfoJSON))
+
+ // Get Network info
+ xml, err := d.GetXMLDesc(0)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ ifInfo, err := parseXMLForMAC(xml)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ setInterfaceAnnotation(ifInfo, vmIfInfoChan)
+
+ ifInfoJSON, err := json.Marshal(ifInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", name, "interfaces"), string(ifInfoJSON))
+ }
+ return nil
+}
+
+// RunVirshEventLoop is event loop to watch libvirt update
+func RunVirshEventLoop(ctx context.Context, conn *libvirt.Connect, vmIfInfoChan chan string) error {
+ callbackID, err := conn.DomainEventLifecycleRegister(nil, domainEventLifecycleCallback(vmIfInfoChan))
+ if err != nil {
+ log.Fatalf("Err: callbackid: %d %v", callbackID, err)
+ }
+
+ libvirt.EventAddTimeout(5000, func(timer int) { return }) // 5000 = 5sec
+ log.Printf("Entering libvirt event loop()")
+EVENTLOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ break EVENTLOOP
+ default:
+ if err := libvirt.EventRunDefaultImpl(); err != nil {
+ log.Fatalf("%v", err)
+ }
+ }
+ }
+ log.Printf("Quitting libvirt event loop()")
+
+ if err := conn.DomainEventDeregister(callbackID); err != nil {
+ log.Fatalf("%v", err)
+ }
+ return nil
+}
diff --git a/src/dma/cmd/server/agent.go b/src/dma/cmd/server/agent.go
new file mode 100644
index 00000000..ffcb4a97
--- /dev/null
+++ b/src/dma/cmd/server/agent.go
@@ -0,0 +1,31 @@
+package main
+
+import (
+ "fmt"
+ "os/exec"
+ "strings"
+)
+
+func createCollectdConf() error {
+ outStatus, errStatus := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "status", "collectd").Output()
+ if errStatus != nil {
+ return fmt.Errorf("status NG")
+ }
+ if !strings.Contains(string(outStatus), "running") {
+ return fmt.Errorf("status not running")
+ }
+
+ _, errStop := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "stop", "collectd").Output()
+ if errStop != nil {
+ return fmt.Errorf("stop NG")
+ }
+
+ _, errStart := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "start", "collectd").Output()
+ if errStart != nil {
+ return fmt.Errorf("start NG")
+ }
+
+ fmt.Println("All complete!")
+
+ return nil
+}
diff --git a/src/dma/cmd/server/amqp.go b/src/dma/cmd/server/amqp.go
new file mode 100644
index 00000000..4d080fe1
--- /dev/null
+++ b/src/dma/cmd/server/amqp.go
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2017 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "github.com/streadway/amqp"
+ "log"
+ "os"
+ "strings"
+)
+
+func failOnError(err error, msg string) {
+ if err != nil {
+ log.Fatalf("%s: %s", msg, err)
+ }
+}
+
+func runSubscriber(ctx context.Context, config *Config) {
+ confDirPath := config.Server.CollectdConfDir
+ amqpURL := "amqp://" + config.Server.AmqpUser + ":" + config.Server.AmqpPassword + "@" + config.Server.AmqpHost + ":" + config.Server.AmqpPort + "/"
+ conn, err := amqp.Dial(amqpURL)
+ failOnError(err, "Failed to connect to RabbitMQ")
+
+ defer conn.Close()
+
+ ch, err := conn.Channel()
+ failOnError(err, "Failed to open a channel")
+ defer ch.Close()
+
+ err = ch.ExchangeDeclare(
+ "collectd-conf", // name
+ "fanout", // type
+ false, // durable
+ false, // auto-deleted
+ false, // internal
+ false, // no-wait
+ nil, // arguments
+ )
+ failOnError(err, "Failed to declare an exchange")
+
+ q, err := ch.QueueDeclare(
+ "", // name
+ false, // durable
+ false, // delete when unused
+ true, // exclusive
+ false, // no-wait
+ nil, // arguments
+ )
+ failOnError(err, "Failed to declare a queue")
+
+ err = ch.QueueBind(
+ q.Name, // queue name
+ "", // routing key
+ "collectd-conf", // exchange
+ false,
+ nil)
+ failOnError(err, "Failed to bind a queue")
+
+ msgs, err := ch.Consume(
+ q.Name, // queue
+ "", // consumer
+ true, // auto-ack
+ false, // exclusive
+ false, // no-local
+ false, // no-wait
+ nil, // args
+ )
+ failOnError(err, "Failed to register a consumer")
+
+EVENTLOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ break EVENTLOOP
+ case d, ok := <-msgs:
+ if ok {
+ dataText := strings.SplitN(string(d.Body), "/", 2)
+
+ dst, err := os.Create(confDirPath + "/" + dataText[0])
+ failOnError(err, "File create NG")
+ defer dst.Close()
+
+ dst.Write(([]byte)(dataText[1]))
+
+ err = createCollectdConf()
+ failOnError(err, "collectd conf NG")
+
+ log.Printf(" [x] %s", d.Body)
+ }
+ }
+ }
+
+}
diff --git a/src/dma/cmd/server/api.go b/src/dma/cmd/server/api.go
new file mode 100644
index 00000000..e8add0a1
--- /dev/null
+++ b/src/dma/cmd/server/api.go
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/labstack/echo"
+ "io"
+ "net/http"
+ "os"
+ "time"
+)
+
+func runAPIServer(ctx context.Context, config *Config) {
+ confDirPath := config.Server.CollectdConfDir
+ e := echo.New()
+
+ e.GET("/", func(c echo.Context) error {
+ return c.String(http.StatusOK, "GET OK")
+ })
+ e.POST("/collectd/conf", func(c echo.Context) error {
+
+ file, err := c.FormFile("file")
+ if err != nil {
+ return c.String(http.StatusInternalServerError, "file send NG")
+ }
+
+ src, err := file.Open()
+ if err != nil {
+ return c.String(http.StatusInternalServerError, "file open NG")
+ }
+ defer src.Close()
+
+ dst, err := os.Create(confDirPath + "/" + file.Filename)
+ if err != nil {
+ return c.String(http.StatusInternalServerError, "file create NG")
+ }
+ defer dst.Close()
+
+ // Copy
+ if _, err = io.Copy(dst, src); err != nil {
+ return c.String(http.StatusInternalServerError, "file write NG")
+ }
+
+ err = createCollectdConf()
+ if err != nil {
+ errstr := fmt.Sprintf("collectd conf NG:%v", err)
+ return c.String(http.StatusInternalServerError, errstr)
+ }
+ return c.String(http.StatusCreated, "collectd conf OK")
+ })
+
+ // Start server
+ go func() {
+ urlStr := ":" + config.Server.ListenPort
+ if err := e.Start(urlStr); err != nil {
+ e.Logger.Info("shutting down the server")
+ }
+ }()
+
+ // Wait for context.Done() to gracefully shutdown the server with
+ // a timeout of 10 seconds.
+ <-ctx.Done()
+ ctxShutdown, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ if err := e.Shutdown(ctxShutdown); err != nil {
+ e.Logger.Fatal(err)
+ }
+
+}
diff --git a/src/dma/cmd/server/main.go b/src/dma/cmd/server/main.go
new file mode 100644
index 00000000..2e028fa4
--- /dev/null
+++ b/src/dma/cmd/server/main.go
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "flag"
+ "github.com/BurntSushi/toml"
+ "log"
+ "os"
+ "sync"
+)
+
+var serverTypeOpt = flag.String("type", "both", "server type: both(default), pubsub, rest")
+
+// Config is ...
+type Config struct {
+ Server ServerConfig
+}
+
+// ServerConfig is ...
+type ServerConfig struct {
+ ListenPort string `toml:"listen_port"`
+
+ AmqpHost string `toml:"amqp_host"`
+ AmqpUser string `toml:"amqp_user"`
+ AmqpPassword string `toml:"amqp_password"`
+ AmqpPort string `toml:"amqp_port"`
+
+ CollectdConfDir string `toml:"collectd_confdir"`
+}
+
+func main() {
+
+ var config Config
+ _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config)
+ if err != nil {
+ log.Fatalf("Read error of config file")
+ }
+
+ if f, err := os.Stat(config.Server.CollectdConfDir); os.IsNotExist(err) || !f.IsDir() {
+ log.Fatalf("Path \"%s\" is not a directory", config.Server.CollectdConfDir)
+ }
+
+ var waitgroup sync.WaitGroup
+
+ flag.Parse()
+
+ if *serverTypeOpt == "both" || *serverTypeOpt == "pubsub" {
+ ctx := context.Background()
+ waitgroup.Add(1)
+ go func() {
+ defer waitgroup.Done()
+ runSubscriber(ctx, &config)
+ }()
+ log.Printf("Waiting for publish.")
+ }
+
+ if *serverTypeOpt == "both" || *serverTypeOpt == "rest" {
+ ctx := context.Background()
+ waitgroup.Add(1)
+ go func() {
+ defer waitgroup.Done()
+ runAPIServer(ctx, &config)
+ }()
+ log.Printf("Waiting for REST.")
+ }
+
+ waitgroup.Wait()
+ log.Printf("Server stop.")
+}
diff --git a/src/dma/cmd/threshold/evaluate.go b/src/dma/cmd/threshold/evaluate.go
new file mode 100644
index 00000000..8c961253
--- /dev/null
+++ b/src/dma/cmd/threshold/evaluate.go
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2018 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+func evaluate(config *Config, rdlist []rawData) []evalData {
+ edlist := []evalData{}
+
+ for _, rd := range rdlist {
+ maxVal := 0
+ for _, val := range rd.datalist {
+ if maxVal < val {
+ maxVal = val
+ }
+ }
+
+ if maxVal > config.Threshold.Min {
+ edlist = append(edlist, evalData{rd.key, 1})
+ } else {
+ edlist = append(edlist, evalData{rd.key, 0})
+ }
+ }
+ return edlist
+}
diff --git a/src/dma/cmd/threshold/main.go b/src/dma/cmd/threshold/main.go
new file mode 100644
index 00000000..b98328d1
--- /dev/null
+++ b/src/dma/cmd/threshold/main.go
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2018 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "fmt"
+ "github.com/BurntSushi/toml"
+ "log"
+ "sync"
+ "time"
+)
+
+// Config is ...
+type Config struct {
+ Common CommonConfig
+ Threshold ThresholdConfig
+}
+
+// CommonConfig is ...
+type CommonConfig struct {
+ RedisHost string `toml:"redis_host"`
+ RedisPort string `toml:"redis_port"`
+ RedisPassword string `toml:"redis_password"`
+ RedisDB int `toml:"redis_db"`
+}
+
+// ThresholdConfig is ...
+type ThresholdConfig struct {
+ RedisHost string `toml:"redis_host"`
+ RedisPort string `toml:"redis_port"`
+ RedisPassword string `toml:"redis_password"`
+ RedisDB int `toml:"redis_db"`
+
+ Interval int `toml:"interval"`
+ Min int `toml:"min"`
+
+ CollectdPlugin string `toml:"collectd_plugin"`
+ CollectdType string `toml:"collectd_type"`
+}
+
+type rawData struct {
+ key string
+ datalist []int
+}
+
+type evalData struct {
+ key string
+ label int
+}
+
+func main() {
+ var config Config
+ _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config)
+ if err != nil {
+ log.Fatalf("Read error of config: %s", err)
+ }
+
+ thresConfig := config.Threshold
+ log.Printf("Raw data redis config Addr:%s:%s DB:%d", thresConfig.RedisHost, thresConfig.RedisPort, thresConfig.RedisDB)
+ if thresConfig.RedisPassword == "" {
+ log.Printf("Raw data redis password is not set")
+ }
+ annoConfig := config.Common
+ log.Printf("Annotate redis config Addr:%s:%s DB:%d", annoConfig.RedisHost, annoConfig.RedisPort, annoConfig.RedisDB)
+ if annoConfig.RedisPassword == "" {
+ log.Printf("Annotate redis password is not set")
+ }
+
+ var waitgroup sync.WaitGroup
+ ctx := context.Background()
+
+ waitgroup.Add(1)
+ go func() {
+ defer waitgroup.Done()
+ ticker := time.NewTicker(time.Duration(config.Threshold.Interval) * time.Second)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ result1 := read(&config)
+ // analysis()
+ result2 := evaluate(&config, result1)
+ transmit(&config, result2)
+ }
+ }
+ }()
+
+ waitgroup.Wait()
+
+ fmt.Println("End")
+}
diff --git a/src/dma/cmd/threshold/read.go b/src/dma/cmd/threshold/read.go
new file mode 100644
index 00000000..0b0cf5a1
--- /dev/null
+++ b/src/dma/cmd/threshold/read.go
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2018 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "fmt"
+ "github.com/go-redis/redis"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// e.g. collectd/instance-00000001/virt/if_octets-tapd21acb51-35
+const redisKey = "collectd/*/virt/if_octets-*"
+
+func zrangebyscore(config *Config, client *redis.Client, key string) []int {
+
+ unixNow := int(time.Now().Unix())
+
+ val, err := client.ZRangeByScore(key, redis.ZRangeBy{
+ Min: strconv.Itoa(unixNow - config.Threshold.Interval),
+ Max: strconv.Itoa(unixNow),
+ }).Result()
+
+ datalist := []int{}
+
+ if err == redis.Nil {
+ fmt.Println("this key is not exist")
+ os.Exit(1)
+ } else if err != nil {
+ panic(err)
+ } else {
+ for _, strVal := range val {
+ split := strings.Split(strVal, ":")
+ txVal := split[2]
+ floatVal, err := strconv.ParseFloat(txVal, 64)
+ if err != nil {
+ os.Exit(1)
+ }
+ datalist = append(datalist, int(floatVal))
+ }
+ }
+ return datalist
+}
+
+func read(config *Config) []rawData {
+ thresConfig := config.Threshold
+
+ client := redis.NewClient(&redis.Options{
+ Addr: thresConfig.RedisHost + ":" + thresConfig.RedisPort,
+ Password: thresConfig.RedisPassword,
+ DB: thresConfig.RedisDB,
+ })
+
+ keys, err := client.Keys(redisKey).Result()
+ if err != nil {
+ panic(err)
+ }
+
+ rdlist := []rawData{}
+
+ for _, key := range keys {
+ rdlist = append(rdlist, rawData{key, zrangebyscore(config, client, key)})
+ }
+
+ return rdlist
+}
diff --git a/src/dma/cmd/threshold/transmit.go b/src/dma/cmd/threshold/transmit.go
new file mode 100644
index 00000000..8cac2a88
--- /dev/null
+++ b/src/dma/cmd/threshold/transmit.go
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2018 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "fmt"
+ "github.com/distributed-monitoring/agent/pkg/common"
+ "github.com/go-redis/redis"
+ "strconv"
+ "strings"
+ "time"
+)
+
+type collectdNotifier struct {
+ pluginName string
+ typeName string
+}
+
+func send(cn collectdNotifier, message string, severity string, metaData [][2]string) error {
+ unixNow := float64(time.Now().UnixNano()) / 1000000000
+
+ var metaDataStr bytes.Buffer
+ for _, data := range metaData {
+ metaDataStr.WriteString(" s:")
+ metaDataStr.WriteString(data[0])
+ metaDataStr.WriteString("=\"")
+ metaDataStr.WriteString(strings.Replace(data[1], "\"", "\\\"", -1))
+ metaDataStr.WriteString("\"")
+ }
+
+ fmt.Printf("PUTNOTIF message=\"%s\" severity=%s time=%f "+
+ "host=localhost plugin=%s type=%s %s\n",
+ message, severity, unixNow, cn.pluginName, cn.typeName, metaDataStr.String())
+
+ return nil
+}
+
+func transmit(config *Config, edlist []evalData) {
+ annoConfig := config.Common
+
+ client := redis.NewClient(&redis.Options{
+ Addr: annoConfig.RedisHost + ":" + annoConfig.RedisPort,
+ Password: annoConfig.RedisPassword,
+ DB: annoConfig.RedisDB,
+ })
+ pool := common.RedisPool{Client: client}
+
+ notifier := collectdNotifier{
+ pluginName: config.Threshold.CollectdPlugin,
+ typeName: config.Threshold.CollectdType}
+
+ for _, ed := range edlist {
+ if ed.label == 1 {
+
+ fmt.Println("kick action")
+
+ item := strings.Split(ed.key, "/")
+ ifItem := strings.SplitN(item[3], "-", 2)
+ virtName := item[1]
+ virtIF := ifItem[1]
+
+ var message bytes.Buffer
+ message.WriteString("Value exceeded threshold ")
+ message.WriteString(strconv.Itoa(config.Threshold.Min))
+ message.WriteString(".")
+
+ nameVal, _ := pool.Get(fmt.Sprintf("%s/%s/vminfo", "vm", virtName))
+ ifVal, _ := pool.Get(fmt.Sprintf("%s/%s/neutron_network", "if", virtIF))
+
+ nameInfo := fmt.Sprintf("{\"%s\": %s}", virtName, nameVal)
+ ifInfo := fmt.Sprintf("{\"%s\": %s}", virtIF, ifVal)
+
+ fmt.Println(nameInfo)
+ fmt.Println(ifInfo)
+
+ send(notifier, message.String(),
+ "warning",
+ [][2]string{{"vminfo", nameInfo}, {"neutron_network", ifInfo}})
+
+ }
+ }
+}