diff options
author | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-06 09:04:29 +0000 |
---|---|---|
committer | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-07 06:03:01 +0000 |
commit | d61931341176dad9ccff7c967a10d88fe54218fa (patch) | |
tree | 526457882d4abe0c38d2242d6daa311bf8ef51cf /src/dma/cmd | |
parent | 73abc060f31a6bf866fa1dad0a1a6efdfd94d775 (diff) |
src: Add DMA localagent
Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa
Signed-off-by: Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>
Diffstat (limited to 'src/dma/cmd')
-rw-r--r-- | src/dma/cmd/infofetch/daemon.go | 103 | ||||
-rw-r--r-- | src/dma/cmd/infofetch/openstack.go | 522 | ||||
-rw-r--r-- | src/dma/cmd/infofetch/virsh_domain.go | 264 | ||||
-rw-r--r-- | src/dma/cmd/server/agent.go | 31 | ||||
-rw-r--r-- | src/dma/cmd/server/amqp.go | 108 | ||||
-rw-r--r-- | src/dma/cmd/server/api.go | 85 | ||||
-rw-r--r-- | src/dma/cmd/server/main.go | 85 | ||||
-rw-r--r-- | src/dma/cmd/threshold/evaluate.go | 37 | ||||
-rw-r--r-- | src/dma/cmd/threshold/main.go | 107 | ||||
-rw-r--r-- | src/dma/cmd/threshold/read.go | 82 | ||||
-rw-r--r-- | src/dma/cmd/threshold/transmit.go | 97 |
11 files changed, 1521 insertions, 0 deletions
diff --git a/src/dma/cmd/infofetch/daemon.go b/src/dma/cmd/infofetch/daemon.go new file mode 100644 index 00000000..d4ff94f5 --- /dev/null +++ b/src/dma/cmd/infofetch/daemon.go @@ -0,0 +1,103 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "github.com/BurntSushi/toml" + "github.com/distributed-monitoring/agent/pkg/common" + "github.com/go-redis/redis" + libvirt "github.com/libvirt/libvirt-go" + "log" + "sync" +) + +var infoPool common.RedisPool + +// Config is ... +type Config struct { + Common CommonConfig + InfoFetch InfoFetchConfig +} + +// CommonConfig is ... +type CommonConfig struct { + RedisHost string `toml:"redis_host"` + RedisPort string `toml:"redis_port"` + RedisPassword string `toml:"redis_password"` + RedisDB int `toml:"redis_db"` +} + +// InfoFetchConfig is ... +type InfoFetchConfig struct { + OSUsername string `toml:"os_username"` + OSUserDomainName string `toml:"os_user_domain_name"` + OSProjectDomainName string `toml:"os_project_domain_name"` + OSProjectName string `toml:"os_project_name"` + OSPassword string `toml:"os_password"` + OSAuthURL string `toml:"os_auth_url"` +} + +func main() { + + var config Config + _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config) + if err != nil { + log.Println("Read error of config file") + } + + var waitgroup sync.WaitGroup + libvirt.EventRegisterDefaultImpl() + + redisClient := redis.NewClient(&redis.Options{ + Addr: config.Common.RedisHost + ":" + config.Common.RedisPort, + Password: config.Common.RedisPassword, + DB: config.Common.RedisDB, + }) + infoPool = common.RedisPool{Client: redisClient} + // Initialize redis db... + infoPool.DelAll() + + conn, err := libvirt.NewConnect("qemu:///system") + if err != nil { + log.Fatalln("libvirt connect error") + } + defer conn.Close() + + vmIfInfoChan := make(chan string) + { + ctx := context.Background() + waitgroup.Add(1) + go func() { + RunNeutronInfoFetch(ctx, &config, vmIfInfoChan) + waitgroup.Done() + }() + } + + //Get active VM info + GetActiveDomain(conn, vmIfInfoChan) + { + ctx := context.Background() + waitgroup.Add(1) + go func() { + RunVirshEventLoop(ctx, conn, vmIfInfoChan) + waitgroup.Done() + }() + } + + waitgroup.Wait() +} diff --git a/src/dma/cmd/infofetch/openstack.go b/src/dma/cmd/infofetch/openstack.go new file mode 100644 index 00000000..c0c54f5a --- /dev/null +++ b/src/dma/cmd/infofetch/openstack.go @@ -0,0 +1,522 @@ +/* + * Copyright 2017 Red Hat + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net/http" + "net/url" + "os" + "strings" + "text/template" + "time" +) + +var env *InfoFetchConfig + +type userInfo struct { + UserDomainName string + UserName string + Password string + ProjectDomainName string + ProjectName string +} + +var tokenJSONTemplate = `{ + "auth": { + "identity": { + "methods": [ + "password" + ], + "password": { + "user": { + "domain": { + "name": "{{.UserDomainName}}" + }, + "name": "{{.UserName}}", + "password": "{{.Password}}" + } + } + }, + "scope": { + "project": { + "domain": { + "name": "{{.ProjectDomainName}}" + }, + "name": "{{.ProjectName}}" + } + } + } +} +` + +type tokenReply struct { + Token struct { + IsDomain bool `json:"is_domain"` + Methods []string `json:"methods"` + Roles []struct { + ID string `json:"id"` + Name string `json:"name"` + } `json:"roles"` + ExpiresAt time.Time `json:"expires_at"` + Project struct { + Domain struct { + ID string `json:"id"` + Name string `json:"name"` + } `json:"domain"` + ID string `json:"id"` + Name string `json:"name"` + } `json:"project"` + User struct { + PasswordExpiresAt interface{} `json:"password_expires_at"` + Domain struct { + ID string `json:"id"` + Name string `json:"name"` + } `json:"domain"` + ID string `json:"id"` + Name string `json:"name"` + } `json:"user"` + AuditIds []string `json:"audit_ids"` + IssuedAt time.Time `json:"issued_at"` + } `json:"token"` +} + +type token struct { + Token string + ExpiresAt time.Time +} + +func (t *token) CheckToken() { + now := time.Now() + + if t.ExpiresAt.Sub(now).Seconds() < 30 { + newToken, _ := getToken() + t.Token = newToken.Token + t.ExpiresAt = newToken.ExpiresAt + } +} + +func getToken() (*token, error) { + var buf bytes.Buffer + + t := template.Must(template.New("json template1").Parse(tokenJSONTemplate)) + p := userInfo{ + UserDomainName: env.OSUserDomainName, + UserName: env.OSUsername, + Password: env.OSPassword, + ProjectDomainName: env.OSProjectDomainName, + ProjectName: env.OSProjectName, + } + t.Execute(&buf, p) + + body := bytes.NewReader(buf.Bytes()) + req, err := http.NewRequest("POST", env.OSAuthURL+"/auth/tokens?nocatalog", body) + if err != nil { + return &token{"", time.Unix(0, 0)}, fmt.Errorf("http request failed: %v", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return &token{"", time.Unix(0, 0)}, fmt.Errorf("http POST failed: %v", err) + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + + tokenStr, ok := resp.Header["X-Subject-Token"] + if ok != true && len(tokenStr) != 1 { + return &token{"", time.Unix(0, 0)}, fmt.Errorf("no token in openstack reply") + } + + var repl tokenReply + err = json.Unmarshal(b, &repl) + + return &token{tokenStr[0], repl.Token.ExpiresAt}, nil +} + +type service struct { + Description string `json:"description"` + Links struct { + Self string `json:"self"` + } `json:"links"` + Enabled bool `json:"enabled"` + Type string `json:"type"` + ID string `json:"id"` + Name string `json:"name"` +} + +type serviceListReply struct { + Services []service `json:"services"` +} + +func (s *serviceListReply) GetService(name string) (*service, error) { + for _, v := range s.Services { + if v.Name == name { + return &v, nil + } + } + return nil, fmt.Errorf("no service id (%s) found", name) +} + +type endPoint struct { + RegionID string `json:"region_id"` + Links struct { + Self string `json:"self"` + } `json:"links"` + URL string `json:"url"` + Region string `json:"region"` + Enabled bool `json:"enabled"` + Interface string `json:"interface"` + ServiceID string `json:"service_id"` + ID string `json:"id"` +} + +type endPointReply struct { + Endpoints []endPoint `json:"endpoints"` +} + +func (e *endPointReply) GetEndpoint(serviceid string, ifname string) (*endPoint, error) { + for _, v := range e.Endpoints { + if v.Interface == ifname && v.ServiceID == serviceid { + return &v, nil + } + } + return nil, fmt.Errorf("no endpoint found (%s/%s)", serviceid, ifname) +} + +func getEndpoints(token *token) (endPointReply, error) { + token.CheckToken() + req, err := http.NewRequest("GET", env.OSAuthURL+"/endpoints", nil) + if err != nil { + return endPointReply{}, fmt.Errorf("request failed:%v", err) + } + req.Header.Set("X-Auth-Token", token.Token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return endPointReply{}, fmt.Errorf("http GET failed:%v", err) + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + //fmt.Printf("%s", string(b)) + + var repl endPointReply + err = json.Unmarshal(b, &repl) + if err != nil { + return endPointReply{}, fmt.Errorf("http reply decoding failed:%v", err) + } + //fmt.Printf("%v", repl) + return repl, nil +} + +func getServiceList(token *token) (serviceListReply, error) { + token.CheckToken() + req, err := http.NewRequest("GET", env.OSAuthURL+"/services", nil) + if err != nil { + return serviceListReply{}, fmt.Errorf("request failed:%v", err) + } + req.Header.Set("X-Auth-Token", token.Token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return serviceListReply{}, fmt.Errorf("http GET failed:%v", err) + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + + var repl serviceListReply + err = json.Unmarshal(b, &repl) + if err != nil { + return serviceListReply{}, fmt.Errorf("http reply decoding failed:%v", err) + } + return repl, nil +} + +type neutronPort struct { + AllowedAddressPairs []interface{} `json:"allowed_address_pairs"` + ExtraDhcpOpts []interface{} `json:"extra_dhcp_opts"` + UpdatedAt time.Time `json:"updated_at"` + DeviceOwner string `json:"device_owner"` + RevisionNumber int `json:"revision_number"` + PortSecurityEnabled bool `json:"port_security_enabled"` + BindingProfile struct { + } `json:"binding:profile"` + FixedIps []struct { + SubnetID string `json:"subnet_id"` + IPAddress string `json:"ip_address"` + } `json:"fixed_ips"` + ID string `json:"id"` + SecurityGroups []interface{} `json:"security_groups"` + BindingVifDetails struct { + PortFilter bool `json:"port_filter"` + DatapathType string `json:"datapath_type"` + OvsHybridPlug bool `json:"ovs_hybrid_plug"` + } `json:"binding:vif_details"` + BindingVifType string `json:"binding:vif_type"` + MacAddress string `json:"mac_address"` + ProjectID string `json:"project_id"` + Status string `json:"status"` + BindingHostID string `json:"binding:host_id"` + Description string `json:"description"` + Tags []interface{} `json:"tags"` + QosPolicyID interface{} `json:"qos_policy_id"` + Name string `json:"name"` + AdminStateUp bool `json:"admin_state_up"` + NetworkID string `json:"network_id"` + TenantID string `json:"tenant_id"` + CreatedAt time.Time `json:"created_at"` + BindingVnicType string `json:"binding:vnic_type"` + DeviceID string `json:"device_id"` +} + +type neutronPortReply struct { + Ports []neutronPort `json:"ports"` +} + +func getNeutronPorts(token *token, endpoint string) (neutronPortReply, error) { + token.CheckToken() + req, err := http.NewRequest("GET", endpoint+"/v2.0/ports", nil) + if err != nil { + return neutronPortReply{}, fmt.Errorf("request failed:%v", err) + } + req.Header.Set("X-Auth-Token", token.Token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return neutronPortReply{}, fmt.Errorf("http GET failed:%v", err) + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + + var repl neutronPortReply + err = json.Unmarshal(b, &repl) + if err != nil { + return neutronPortReply{}, fmt.Errorf("http reply decoding failed:%v", err) + } + return repl, nil +} + +func (n *neutronPortReply) GetNeutronPortfromMAC(mac string) (*neutronPort, + error) { + for _, v := range n.Ports { + if v.MacAddress == strings.ToLower(mac) { + return &v, nil + } + } + return nil, fmt.Errorf("no port (%s) found", mac) +} + +type neutronNetwork struct { + ProviderPhysicalNetwork string `json:"provider:physical_network"` + Ipv6AddressScope interface{} `json:"ipv6_address_scope"` + RevisionNumber int `json:"revision_number"` + PortSecurityEnabled bool `json:"port_security_enabled"` + Mtu int `json:"mtu"` + ID string `json:"id"` + RouterExternal bool `json:"router:external"` + AvailabilityZoneHints []interface{} `json:"availability_zone_hints"` + AvailabilityZones []string `json:"availability_zones"` + ProviderSegmentationID interface{} `json:"provider:segmentation_id"` + Ipv4AddressScope interface{} `json:"ipv4_address_scope"` + Shared bool `json:"shared"` + ProjectID string `json:"project_id"` + Status string `json:"status"` + Subnets []string `json:"subnets"` + Description string `json:"description"` + Tags []interface{} `json:"tags"` + UpdatedAt time.Time `json:"updated_at"` + IsDefault bool `json:"is_default"` + QosPolicyID interface{} `json:"qos_policy_id"` + Name string `json:"name"` + AdminStateUp bool `json:"admin_state_up"` + TenantID string `json:"tenant_id"` + CreatedAt time.Time `json:"created_at"` + ProviderNetworkType string `json:"provider:network_type"` +} + +type neutronNetworkReply struct { + Networks []neutronNetwork `json:"networks"` +} + +func (n *neutronNetworkReply) GetNetworkFromID(netid string) (*neutronNetwork, error) { + for _, v := range n.Networks { + if v.ID == netid { + return &v, nil + } + } + return nil, fmt.Errorf("no network (%s) found", netid) +} + +func getNetworkReply(token *token, endpoint string) (neutronNetworkReply, error) { + token.CheckToken() + req, err := http.NewRequest("GET", endpoint+"/v2.0/networks", nil) + if err != nil { + return neutronNetworkReply{}, fmt.Errorf("request failed:%v", err) + } + req.Header.Set("X-Auth-Token", token.Token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return neutronNetworkReply{}, fmt.Errorf("http GET failed:%v", err) + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + + var repl neutronNetworkReply + err = json.Unmarshal(b, &repl) + if err != nil { + return neutronNetworkReply{}, fmt.Errorf("http reply decoding failed:%v", err) + } + return repl, nil +} + +type novaCompute struct { + ID string `json:"id"` + Links []struct { + Href string `json:"href"` + Rel string `json:"rel"` + } `json:"links"` + Name string `json:"name"` +} + +type novaComputeReply struct { + Servers []novaCompute `json:"servers"` +} + +func (n *novaComputeReply) GetComputeFromID(vmid string) (*novaCompute, error) { + for _, v := range n.Servers { + if v.ID == vmid { + return &v, nil + } + } + return nil, fmt.Errorf("no vm (%s) found", vmid) +} + +func getComputeReply(token *token, endpoint string) (novaComputeReply, error) { + token.CheckToken() + values := url.Values{} + values.Add("all_tenants", "1") + + req, err := http.NewRequest("GET", endpoint+"/servers", nil) + if err != nil { + return novaComputeReply{}, fmt.Errorf("request failed:%v", err) + } + req.Header.Set("X-Auth-Token", token.Token) + req.URL.RawQuery = values.Encode() + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return novaComputeReply{}, fmt.Errorf("http GET failed:%v", err) + } + defer resp.Body.Close() + + b, err := ioutil.ReadAll(resp.Body) + + var repl novaComputeReply + err = json.Unmarshal(b, &repl) + if err != nil { + return novaComputeReply{}, fmt.Errorf("http reply decoding failed:%v", err) + } + + return repl, nil +} + +type osNeutronInterfaceAnnotation struct { + IfName string + VMName string + NetworkName string +} + +// RunNeutronInfoFetch gets redis key update from libvirt and get network information +// from Neutron with REST api. The retrieved information is stored under redis, +// if/<tap name>/neutron_network +func RunNeutronInfoFetch(ctx context.Context, config *Config, vmIfInfo chan string) error { + env = &config.InfoFetch + token, err := getToken() + + if err != nil { + fmt.Fprintf(os.Stderr, "cannot get token: %v", err) + return err + } + + svc, _ := getServiceList(token) + neuID, _ := svc.GetService("neutron") + //fmt.Printf("neutron id:%s\n", id.ID) + + novaID, _ := svc.GetService("nova") + //fmt.Printf("nova id:%s\n", id.ID) + + endpoints, _ := getEndpoints(token) + neuEndpoint, _ := endpoints.GetEndpoint(neuID.ID, "admin") + //fmt.Printf("neutron endpoint:%s\n", neuEndpoint.URL) + + novaEndpoint, _ := endpoints.GetEndpoint(novaID.ID, "admin") + //fmt.Printf("nova endpoint:%s\n", novaEndpoint.URL) + + getComputeReply(token, novaEndpoint.URL) + getNeutronPorts(token, neuEndpoint.URL) + //vmrepl, _ := getComputeReply(token, novaEndpoint.URL) + //prepl, _ := getNeutronPorts(token, neuEndpoint.URL) + +EVENTLOOP: + for { + select { + case <-ctx.Done(): + break EVENTLOOP + case key := <-vmIfInfo: + log.Printf("Incoming IF: %v", key) + libvirtIfInfo, err := infoPool.Get(key) + if err != nil { + log.Fatalf("Err: %v", err) + } else { + var ifInfo osVMInterfaceAnnotation + err = json.Unmarshal([]byte(libvirtIfInfo), &ifInfo) + if err != nil { + log.Fatalf("Err: %v", err) + } else { + vmrepl, _ := getComputeReply(token, novaEndpoint.URL) + prepl, _ := getNeutronPorts(token, neuEndpoint.URL) + nrepl, _ := getNetworkReply(token, neuEndpoint.URL) + netid, _ := prepl.GetNeutronPortfromMAC(ifInfo.MacAddr) + net, _ := nrepl.GetNetworkFromID(netid.NetworkID) + vm, _ := vmrepl.GetComputeFromID(netid.DeviceID) + osIfInfo := osNeutronInterfaceAnnotation{ + IfName: ifInfo.Target, + VMName: vm.Name, + NetworkName: net.Name} + + osIfInfoJSON, err := json.Marshal(osIfInfo) + if err != nil { + log.Fatalf("Err: %v", err) + } else { + log.Printf("Get: vmname: %s / networkname:%s", vm.Name, net.Name) + infoPool.Set(fmt.Sprintf("if/%s/%s", ifInfo.Target, "neutron_network"), string(osIfInfoJSON)) + } + } + } + } + } + return nil +} diff --git a/src/dma/cmd/infofetch/virsh_domain.go b/src/dma/cmd/infofetch/virsh_domain.go new file mode 100644 index 00000000..b79f5bdd --- /dev/null +++ b/src/dma/cmd/infofetch/virsh_domain.go @@ -0,0 +1,264 @@ +/* + * Copyright 2017 Red Hat + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "encoding/json" + "encoding/xml" + "fmt" + libvirt "github.com/libvirt/libvirt-go" + "log" +) + +type instance struct { + Name string `xml:"name"` + Owner struct { + User string `xml:"user"` + Project string `xml:"project"` + } `xml:"owner"` + Flavor struct { + Name string `xml:"name,attr"` + } `xml:"flavor"` +} + +type domain struct { + Name string `xml:"name"` + Devices struct { + Interfaces []struct { + Type string `xml:"type,attr"` + Mac struct { + Address string `xml:"address,attr"` + } `xml:"mac"` + Target struct { + Dev string `xml:"dev,attr"` + } `xml:"target"` + } `xml:"interface"` + } `xml:"devices"` +} + +type osVMAnnotation struct { + Name string + Owner string + Project string + Flavor string +} + +type osVMInterfaceAnnotation struct { + Type string + MacAddr string + Target string + VMName string +} + +func parseNovaMetadata(metadata string) (*osVMAnnotation, error) { + data := new(instance) + + if err := xml.Unmarshal([]byte(metadata), data); err != nil { + log.Println("XML Unmarshal error:", err) + return nil, err + } + log.Printf("Get name: %s user: %s, project: %s, flavor: %s", data.Name, data.Owner.User, data.Owner.Project, data.Flavor.Name) + return &osVMAnnotation{ + Name: data.Name, + Owner: data.Owner.User, + Project: data.Owner.Project, + Flavor: data.Flavor.Name}, nil +} + +func parseXMLForMAC(dumpxml string) (*[]osVMInterfaceAnnotation, error) { + data := new(domain) + + if err := xml.Unmarshal([]byte(dumpxml), data); err != nil { + log.Println("XML Unmarshal error:", err) + return nil, err + } + + ifAnnotation := make([]osVMInterfaceAnnotation, len(data.Devices.Interfaces)) + for i, v := range data.Devices.Interfaces { + log.Printf("Interface type: %s, mac_addr: %s, target_dev: %s", v.Type, v.Mac.Address, v.Target.Dev) + ifAnnotation[i] = osVMInterfaceAnnotation{ + Type: v.Type, + MacAddr: v.Mac.Address, + Target: v.Target.Dev, + VMName: data.Name} + } + return &ifAnnotation, nil +} + +func setInterfaceAnnotation(ifInfo *[]osVMInterfaceAnnotation, vmIfInfoChan chan string) { + for _, v := range *ifInfo { + ifInfoJSON, err := json.Marshal(v) + if err != nil { + log.Fatalf("Err: %v", err) + } + infoPool.Set(fmt.Sprintf("if/%s/%s", v.Target, "network"), string(ifInfoJSON)) + + vmIfInfoChan <- fmt.Sprintf("if/%s/%s", v.Target, "network") + } + return +} + +func domainEventLifecycleCallback(vmIfInfo chan string) func(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventLifecycle) { + + return func(c *libvirt.Connect, + d *libvirt.Domain, event *libvirt.DomainEventLifecycle) { + domName, _ := d.GetName() + + switch event.Event { + case libvirt.DOMAIN_EVENT_DEFINED: + // VM defined: vmname (libvirt, nova), user, project, flavor + // Redis: <vnname>/vminfo + log.Printf("Event defined: domName: %s, event: %v", domName, event) + metadata, err := d.GetMetadata(libvirt.DOMAIN_METADATA_ELEMENT, "http://openstack.org/xmlns/libvirt/nova/1.0", libvirt.DOMAIN_AFFECT_CONFIG) + if err != nil { + log.Fatalf("Err: %v", err) + } + vmInfo, err := parseNovaMetadata(metadata) + if err != nil { + log.Fatalf("Err: %v", err) + } + vmInfoJSON, err := json.Marshal(vmInfo) + if err != nil { + log.Fatalf("Err: %v", err) + } + infoPool.Set(fmt.Sprintf("vm/%s/%s", domName, "vminfo"), string(vmInfoJSON)) + case libvirt.DOMAIN_EVENT_STARTED: + // VM started: interface type, interface mac addr, intarface type + // Redis: <vnname>/interfaces + log.Printf("Event started: domName: %s, event: %v", domName, event) + + xml, err := d.GetXMLDesc(0) + if err != nil { + log.Fatalf("Err: %v", err) + } + ifInfo, err := parseXMLForMAC(xml) + if err != nil { + log.Fatalf("Err: %v", err) + } + setInterfaceAnnotation(ifInfo, vmIfInfo) + + ifInfoJSON, err := json.Marshal(ifInfo) + if err != nil { + log.Fatalf("Err: %v", err) + } + infoPool.Set(fmt.Sprintf("vm/%s/%s", domName, "interfaces"), string(ifInfoJSON)) + case libvirt.DOMAIN_EVENT_UNDEFINED: + log.Printf("Event undefined: domName: %s, event: %v", domName, event) + vmIFInfo, err := infoPool.Get(fmt.Sprintf("vm/%s/%s", domName, "interfaces")) + if err != nil { + log.Fatalf("Err: %v", err) + } else { + var interfaces []osVMInterfaceAnnotation + err = json.Unmarshal([]byte(vmIFInfo), &interfaces) + if err != nil { + log.Fatalf("Err: %v", err) + } else { + for _, v := range interfaces { + infoPool.Del(fmt.Sprintf("if/%s/%s", v.Target, "network")) + infoPool.Del(fmt.Sprintf("if/%s/%s", v.Target, "neutron_network")) + } + } + } + infoPool.Del(fmt.Sprintf("vm/%s/%s", domName, "vminfo")) + infoPool.Del(fmt.Sprintf("vm/%s/%s", domName, "interfaces")) + default: + log.Printf("Event misc: domName: %s, event: %v", domName, event) + } + } +} + +// GetActiveDomain gets all active domain information from libvirt and it should be called at startup to get +// current running domain information +func GetActiveDomain(conn *libvirt.Connect, vmIfInfoChan chan string) error { + doms, err := conn.ListAllDomains(libvirt.CONNECT_LIST_DOMAINS_ACTIVE) + if err != nil { + log.Fatalf("libvirt dom list error: %s", err) + return err + } + + for _, d := range doms { + name, err := d.GetName() + + // Get VM Info + metadata, err := d.GetMetadata(libvirt.DOMAIN_METADATA_ELEMENT, "http://openstack.org/xmlns/libvirt/nova/1.0", libvirt.DOMAIN_AFFECT_CONFIG) + if err != nil { + log.Fatalf("Err: %v", err) + return err + } + vmInfo, err := parseNovaMetadata(metadata) + if err != nil { + log.Fatalf("Err: %v", err) + return err + } + vmInfoJSON, err := json.Marshal(vmInfo) + if err != nil { + log.Fatalf("Err: %v", err) + return err + } + infoPool.Set(fmt.Sprintf("vm/%s/%s", name, "vminfo"), string(vmInfoJSON)) + + // Get Network info + xml, err := d.GetXMLDesc(0) + if err != nil { + log.Fatalf("Err: %v", err) + return err + } + ifInfo, err := parseXMLForMAC(xml) + if err != nil { + log.Fatalf("Err: %v", err) + return err + } + setInterfaceAnnotation(ifInfo, vmIfInfoChan) + + ifInfoJSON, err := json.Marshal(ifInfo) + if err != nil { + log.Fatalf("Err: %v", err) + return err + } + infoPool.Set(fmt.Sprintf("vm/%s/%s", name, "interfaces"), string(ifInfoJSON)) + } + return nil +} + +// RunVirshEventLoop is event loop to watch libvirt update +func RunVirshEventLoop(ctx context.Context, conn *libvirt.Connect, vmIfInfoChan chan string) error { + callbackID, err := conn.DomainEventLifecycleRegister(nil, domainEventLifecycleCallback(vmIfInfoChan)) + if err != nil { + log.Fatalf("Err: callbackid: %d %v", callbackID, err) + } + + libvirt.EventAddTimeout(5000, func(timer int) { return }) // 5000 = 5sec + log.Printf("Entering libvirt event loop()") +EVENTLOOP: + for { + select { + case <-ctx.Done(): + break EVENTLOOP + default: + if err := libvirt.EventRunDefaultImpl(); err != nil { + log.Fatalf("%v", err) + } + } + } + log.Printf("Quitting libvirt event loop()") + + if err := conn.DomainEventDeregister(callbackID); err != nil { + log.Fatalf("%v", err) + } + return nil +} diff --git a/src/dma/cmd/server/agent.go b/src/dma/cmd/server/agent.go new file mode 100644 index 00000000..ffcb4a97 --- /dev/null +++ b/src/dma/cmd/server/agent.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "os/exec" + "strings" +) + +func createCollectdConf() error { + outStatus, errStatus := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "status", "collectd").Output() + if errStatus != nil { + return fmt.Errorf("status NG") + } + if !strings.Contains(string(outStatus), "running") { + return fmt.Errorf("status not running") + } + + _, errStop := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "stop", "collectd").Output() + if errStop != nil { + return fmt.Errorf("stop NG") + } + + _, errStart := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "start", "collectd").Output() + if errStart != nil { + return fmt.Errorf("start NG") + } + + fmt.Println("All complete!") + + return nil +} diff --git a/src/dma/cmd/server/amqp.go b/src/dma/cmd/server/amqp.go new file mode 100644 index 00000000..4d080fe1 --- /dev/null +++ b/src/dma/cmd/server/amqp.go @@ -0,0 +1,108 @@ +/* + * Copyright 2017 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "github.com/streadway/amqp" + "log" + "os" + "strings" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + } +} + +func runSubscriber(ctx context.Context, config *Config) { + confDirPath := config.Server.CollectdConfDir + amqpURL := "amqp://" + config.Server.AmqpUser + ":" + config.Server.AmqpPassword + "@" + config.Server.AmqpHost + ":" + config.Server.AmqpPort + "/" + conn, err := amqp.Dial(amqpURL) + failOnError(err, "Failed to connect to RabbitMQ") + + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + err = ch.ExchangeDeclare( + "collectd-conf", // name + "fanout", // type + false, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare an exchange") + + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "collectd-conf", // exchange + false, + nil) + failOnError(err, "Failed to bind a queue") + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + +EVENTLOOP: + for { + select { + case <-ctx.Done(): + break EVENTLOOP + case d, ok := <-msgs: + if ok { + dataText := strings.SplitN(string(d.Body), "/", 2) + + dst, err := os.Create(confDirPath + "/" + dataText[0]) + failOnError(err, "File create NG") + defer dst.Close() + + dst.Write(([]byte)(dataText[1])) + + err = createCollectdConf() + failOnError(err, "collectd conf NG") + + log.Printf(" [x] %s", d.Body) + } + } + } + +} diff --git a/src/dma/cmd/server/api.go b/src/dma/cmd/server/api.go new file mode 100644 index 00000000..e8add0a1 --- /dev/null +++ b/src/dma/cmd/server/api.go @@ -0,0 +1,85 @@ +/* + * Copyright 2017 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "github.com/labstack/echo" + "io" + "net/http" + "os" + "time" +) + +func runAPIServer(ctx context.Context, config *Config) { + confDirPath := config.Server.CollectdConfDir + e := echo.New() + + e.GET("/", func(c echo.Context) error { + return c.String(http.StatusOK, "GET OK") + }) + e.POST("/collectd/conf", func(c echo.Context) error { + + file, err := c.FormFile("file") + if err != nil { + return c.String(http.StatusInternalServerError, "file send NG") + } + + src, err := file.Open() + if err != nil { + return c.String(http.StatusInternalServerError, "file open NG") + } + defer src.Close() + + dst, err := os.Create(confDirPath + "/" + file.Filename) + if err != nil { + return c.String(http.StatusInternalServerError, "file create NG") + } + defer dst.Close() + + // Copy + if _, err = io.Copy(dst, src); err != nil { + return c.String(http.StatusInternalServerError, "file write NG") + } + + err = createCollectdConf() + if err != nil { + errstr := fmt.Sprintf("collectd conf NG:%v", err) + return c.String(http.StatusInternalServerError, errstr) + } + return c.String(http.StatusCreated, "collectd conf OK") + }) + + // Start server + go func() { + urlStr := ":" + config.Server.ListenPort + if err := e.Start(urlStr); err != nil { + e.Logger.Info("shutting down the server") + } + }() + + // Wait for context.Done() to gracefully shutdown the server with + // a timeout of 10 seconds. + <-ctx.Done() + ctxShutdown, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := e.Shutdown(ctxShutdown); err != nil { + e.Logger.Fatal(err) + } + +} diff --git a/src/dma/cmd/server/main.go b/src/dma/cmd/server/main.go new file mode 100644 index 00000000..2e028fa4 --- /dev/null +++ b/src/dma/cmd/server/main.go @@ -0,0 +1,85 @@ +/* + * Copyright 2017 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "flag" + "github.com/BurntSushi/toml" + "log" + "os" + "sync" +) + +var serverTypeOpt = flag.String("type", "both", "server type: both(default), pubsub, rest") + +// Config is ... +type Config struct { + Server ServerConfig +} + +// ServerConfig is ... +type ServerConfig struct { + ListenPort string `toml:"listen_port"` + + AmqpHost string `toml:"amqp_host"` + AmqpUser string `toml:"amqp_user"` + AmqpPassword string `toml:"amqp_password"` + AmqpPort string `toml:"amqp_port"` + + CollectdConfDir string `toml:"collectd_confdir"` +} + +func main() { + + var config Config + _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config) + if err != nil { + log.Fatalf("Read error of config file") + } + + if f, err := os.Stat(config.Server.CollectdConfDir); os.IsNotExist(err) || !f.IsDir() { + log.Fatalf("Path \"%s\" is not a directory", config.Server.CollectdConfDir) + } + + var waitgroup sync.WaitGroup + + flag.Parse() + + if *serverTypeOpt == "both" || *serverTypeOpt == "pubsub" { + ctx := context.Background() + waitgroup.Add(1) + go func() { + defer waitgroup.Done() + runSubscriber(ctx, &config) + }() + log.Printf("Waiting for publish.") + } + + if *serverTypeOpt == "both" || *serverTypeOpt == "rest" { + ctx := context.Background() + waitgroup.Add(1) + go func() { + defer waitgroup.Done() + runAPIServer(ctx, &config) + }() + log.Printf("Waiting for REST.") + } + + waitgroup.Wait() + log.Printf("Server stop.") +} diff --git a/src/dma/cmd/threshold/evaluate.go b/src/dma/cmd/threshold/evaluate.go new file mode 100644 index 00000000..8c961253 --- /dev/null +++ b/src/dma/cmd/threshold/evaluate.go @@ -0,0 +1,37 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +func evaluate(config *Config, rdlist []rawData) []evalData { + edlist := []evalData{} + + for _, rd := range rdlist { + maxVal := 0 + for _, val := range rd.datalist { + if maxVal < val { + maxVal = val + } + } + + if maxVal > config.Threshold.Min { + edlist = append(edlist, evalData{rd.key, 1}) + } else { + edlist = append(edlist, evalData{rd.key, 0}) + } + } + return edlist +} diff --git a/src/dma/cmd/threshold/main.go b/src/dma/cmd/threshold/main.go new file mode 100644 index 00000000..b98328d1 --- /dev/null +++ b/src/dma/cmd/threshold/main.go @@ -0,0 +1,107 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "github.com/BurntSushi/toml" + "log" + "sync" + "time" +) + +// Config is ... +type Config struct { + Common CommonConfig + Threshold ThresholdConfig +} + +// CommonConfig is ... +type CommonConfig struct { + RedisHost string `toml:"redis_host"` + RedisPort string `toml:"redis_port"` + RedisPassword string `toml:"redis_password"` + RedisDB int `toml:"redis_db"` +} + +// ThresholdConfig is ... +type ThresholdConfig struct { + RedisHost string `toml:"redis_host"` + RedisPort string `toml:"redis_port"` + RedisPassword string `toml:"redis_password"` + RedisDB int `toml:"redis_db"` + + Interval int `toml:"interval"` + Min int `toml:"min"` + + CollectdPlugin string `toml:"collectd_plugin"` + CollectdType string `toml:"collectd_type"` +} + +type rawData struct { + key string + datalist []int +} + +type evalData struct { + key string + label int +} + +func main() { + var config Config + _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config) + if err != nil { + log.Fatalf("Read error of config: %s", err) + } + + thresConfig := config.Threshold + log.Printf("Raw data redis config Addr:%s:%s DB:%d", thresConfig.RedisHost, thresConfig.RedisPort, thresConfig.RedisDB) + if thresConfig.RedisPassword == "" { + log.Printf("Raw data redis password is not set") + } + annoConfig := config.Common + log.Printf("Annotate redis config Addr:%s:%s DB:%d", annoConfig.RedisHost, annoConfig.RedisPort, annoConfig.RedisDB) + if annoConfig.RedisPassword == "" { + log.Printf("Annotate redis password is not set") + } + + var waitgroup sync.WaitGroup + ctx := context.Background() + + waitgroup.Add(1) + go func() { + defer waitgroup.Done() + ticker := time.NewTicker(time.Duration(config.Threshold.Interval) * time.Second) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + result1 := read(&config) + // analysis() + result2 := evaluate(&config, result1) + transmit(&config, result2) + } + } + }() + + waitgroup.Wait() + + fmt.Println("End") +} diff --git a/src/dma/cmd/threshold/read.go b/src/dma/cmd/threshold/read.go new file mode 100644 index 00000000..0b0cf5a1 --- /dev/null +++ b/src/dma/cmd/threshold/read.go @@ -0,0 +1,82 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "fmt" + "github.com/go-redis/redis" + "os" + "strconv" + "strings" + "time" +) + +// e.g. collectd/instance-00000001/virt/if_octets-tapd21acb51-35 +const redisKey = "collectd/*/virt/if_octets-*" + +func zrangebyscore(config *Config, client *redis.Client, key string) []int { + + unixNow := int(time.Now().Unix()) + + val, err := client.ZRangeByScore(key, redis.ZRangeBy{ + Min: strconv.Itoa(unixNow - config.Threshold.Interval), + Max: strconv.Itoa(unixNow), + }).Result() + + datalist := []int{} + + if err == redis.Nil { + fmt.Println("this key is not exist") + os.Exit(1) + } else if err != nil { + panic(err) + } else { + for _, strVal := range val { + split := strings.Split(strVal, ":") + txVal := split[2] + floatVal, err := strconv.ParseFloat(txVal, 64) + if err != nil { + os.Exit(1) + } + datalist = append(datalist, int(floatVal)) + } + } + return datalist +} + +func read(config *Config) []rawData { + thresConfig := config.Threshold + + client := redis.NewClient(&redis.Options{ + Addr: thresConfig.RedisHost + ":" + thresConfig.RedisPort, + Password: thresConfig.RedisPassword, + DB: thresConfig.RedisDB, + }) + + keys, err := client.Keys(redisKey).Result() + if err != nil { + panic(err) + } + + rdlist := []rawData{} + + for _, key := range keys { + rdlist = append(rdlist, rawData{key, zrangebyscore(config, client, key)}) + } + + return rdlist +} diff --git a/src/dma/cmd/threshold/transmit.go b/src/dma/cmd/threshold/transmit.go new file mode 100644 index 00000000..8cac2a88 --- /dev/null +++ b/src/dma/cmd/threshold/transmit.go @@ -0,0 +1,97 @@ +/* + * Copyright 2018 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "fmt" + "github.com/distributed-monitoring/agent/pkg/common" + "github.com/go-redis/redis" + "strconv" + "strings" + "time" +) + +type collectdNotifier struct { + pluginName string + typeName string +} + +func send(cn collectdNotifier, message string, severity string, metaData [][2]string) error { + unixNow := float64(time.Now().UnixNano()) / 1000000000 + + var metaDataStr bytes.Buffer + for _, data := range metaData { + metaDataStr.WriteString(" s:") + metaDataStr.WriteString(data[0]) + metaDataStr.WriteString("=\"") + metaDataStr.WriteString(strings.Replace(data[1], "\"", "\\\"", -1)) + metaDataStr.WriteString("\"") + } + + fmt.Printf("PUTNOTIF message=\"%s\" severity=%s time=%f "+ + "host=localhost plugin=%s type=%s %s\n", + message, severity, unixNow, cn.pluginName, cn.typeName, metaDataStr.String()) + + return nil +} + +func transmit(config *Config, edlist []evalData) { + annoConfig := config.Common + + client := redis.NewClient(&redis.Options{ + Addr: annoConfig.RedisHost + ":" + annoConfig.RedisPort, + Password: annoConfig.RedisPassword, + DB: annoConfig.RedisDB, + }) + pool := common.RedisPool{Client: client} + + notifier := collectdNotifier{ + pluginName: config.Threshold.CollectdPlugin, + typeName: config.Threshold.CollectdType} + + for _, ed := range edlist { + if ed.label == 1 { + + fmt.Println("kick action") + + item := strings.Split(ed.key, "/") + ifItem := strings.SplitN(item[3], "-", 2) + virtName := item[1] + virtIF := ifItem[1] + + var message bytes.Buffer + message.WriteString("Value exceeded threshold ") + message.WriteString(strconv.Itoa(config.Threshold.Min)) + message.WriteString(".") + + nameVal, _ := pool.Get(fmt.Sprintf("%s/%s/vminfo", "vm", virtName)) + ifVal, _ := pool.Get(fmt.Sprintf("%s/%s/neutron_network", "if", virtIF)) + + nameInfo := fmt.Sprintf("{\"%s\": %s}", virtName, nameVal) + ifInfo := fmt.Sprintf("{\"%s\": %s}", virtIF, ifVal) + + fmt.Println(nameInfo) + fmt.Println(ifInfo) + + send(notifier, message.String(), + "warning", + [][2]string{{"vminfo", nameInfo}, {"neutron_network", ifInfo}}) + + } + } +} |