aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/cmd/infofetch
diff options
context:
space:
mode:
authorToshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>2018-09-06 09:04:29 +0000
committerToshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>2018-09-07 06:03:01 +0000
commitd61931341176dad9ccff7c967a10d88fe54218fa (patch)
tree526457882d4abe0c38d2242d6daa311bf8ef51cf /src/dma/cmd/infofetch
parent73abc060f31a6bf866fa1dad0a1a6efdfd94d775 (diff)
src: Add DMA localagent
Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa Signed-off-by: Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>
Diffstat (limited to 'src/dma/cmd/infofetch')
-rw-r--r--src/dma/cmd/infofetch/daemon.go103
-rw-r--r--src/dma/cmd/infofetch/openstack.go522
-rw-r--r--src/dma/cmd/infofetch/virsh_domain.go264
3 files changed, 889 insertions, 0 deletions
diff --git a/src/dma/cmd/infofetch/daemon.go b/src/dma/cmd/infofetch/daemon.go
new file mode 100644
index 00000000..d4ff94f5
--- /dev/null
+++ b/src/dma/cmd/infofetch/daemon.go
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "github.com/BurntSushi/toml"
+ "github.com/distributed-monitoring/agent/pkg/common"
+ "github.com/go-redis/redis"
+ libvirt "github.com/libvirt/libvirt-go"
+ "log"
+ "sync"
+)
+
+var infoPool common.RedisPool
+
+// Config is ...
+type Config struct {
+ Common CommonConfig
+ InfoFetch InfoFetchConfig
+}
+
+// CommonConfig is ...
+type CommonConfig struct {
+ RedisHost string `toml:"redis_host"`
+ RedisPort string `toml:"redis_port"`
+ RedisPassword string `toml:"redis_password"`
+ RedisDB int `toml:"redis_db"`
+}
+
+// InfoFetchConfig is ...
+type InfoFetchConfig struct {
+ OSUsername string `toml:"os_username"`
+ OSUserDomainName string `toml:"os_user_domain_name"`
+ OSProjectDomainName string `toml:"os_project_domain_name"`
+ OSProjectName string `toml:"os_project_name"`
+ OSPassword string `toml:"os_password"`
+ OSAuthURL string `toml:"os_auth_url"`
+}
+
+func main() {
+
+ var config Config
+ _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config)
+ if err != nil {
+ log.Println("Read error of config file")
+ }
+
+ var waitgroup sync.WaitGroup
+ libvirt.EventRegisterDefaultImpl()
+
+ redisClient := redis.NewClient(&redis.Options{
+ Addr: config.Common.RedisHost + ":" + config.Common.RedisPort,
+ Password: config.Common.RedisPassword,
+ DB: config.Common.RedisDB,
+ })
+ infoPool = common.RedisPool{Client: redisClient}
+ // Initialize redis db...
+ infoPool.DelAll()
+
+ conn, err := libvirt.NewConnect("qemu:///system")
+ if err != nil {
+ log.Fatalln("libvirt connect error")
+ }
+ defer conn.Close()
+
+ vmIfInfoChan := make(chan string)
+ {
+ ctx := context.Background()
+ waitgroup.Add(1)
+ go func() {
+ RunNeutronInfoFetch(ctx, &config, vmIfInfoChan)
+ waitgroup.Done()
+ }()
+ }
+
+ //Get active VM info
+ GetActiveDomain(conn, vmIfInfoChan)
+ {
+ ctx := context.Background()
+ waitgroup.Add(1)
+ go func() {
+ RunVirshEventLoop(ctx, conn, vmIfInfoChan)
+ waitgroup.Done()
+ }()
+ }
+
+ waitgroup.Wait()
+}
diff --git a/src/dma/cmd/infofetch/openstack.go b/src/dma/cmd/infofetch/openstack.go
new file mode 100644
index 00000000..c0c54f5a
--- /dev/null
+++ b/src/dma/cmd/infofetch/openstack.go
@@ -0,0 +1,522 @@
+/*
+ * Copyright 2017 Red Hat
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/url"
+ "os"
+ "strings"
+ "text/template"
+ "time"
+)
+
+var env *InfoFetchConfig
+
+type userInfo struct {
+ UserDomainName string
+ UserName string
+ Password string
+ ProjectDomainName string
+ ProjectName string
+}
+
+var tokenJSONTemplate = `{
+ "auth": {
+ "identity": {
+ "methods": [
+ "password"
+ ],
+ "password": {
+ "user": {
+ "domain": {
+ "name": "{{.UserDomainName}}"
+ },
+ "name": "{{.UserName}}",
+ "password": "{{.Password}}"
+ }
+ }
+ },
+ "scope": {
+ "project": {
+ "domain": {
+ "name": "{{.ProjectDomainName}}"
+ },
+ "name": "{{.ProjectName}}"
+ }
+ }
+ }
+}
+`
+
+type tokenReply struct {
+ Token struct {
+ IsDomain bool `json:"is_domain"`
+ Methods []string `json:"methods"`
+ Roles []struct {
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"roles"`
+ ExpiresAt time.Time `json:"expires_at"`
+ Project struct {
+ Domain struct {
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"domain"`
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"project"`
+ User struct {
+ PasswordExpiresAt interface{} `json:"password_expires_at"`
+ Domain struct {
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"domain"`
+ ID string `json:"id"`
+ Name string `json:"name"`
+ } `json:"user"`
+ AuditIds []string `json:"audit_ids"`
+ IssuedAt time.Time `json:"issued_at"`
+ } `json:"token"`
+}
+
+type token struct {
+ Token string
+ ExpiresAt time.Time
+}
+
+func (t *token) CheckToken() {
+ now := time.Now()
+
+ if t.ExpiresAt.Sub(now).Seconds() < 30 {
+ newToken, _ := getToken()
+ t.Token = newToken.Token
+ t.ExpiresAt = newToken.ExpiresAt
+ }
+}
+
+func getToken() (*token, error) {
+ var buf bytes.Buffer
+
+ t := template.Must(template.New("json template1").Parse(tokenJSONTemplate))
+ p := userInfo{
+ UserDomainName: env.OSUserDomainName,
+ UserName: env.OSUsername,
+ Password: env.OSPassword,
+ ProjectDomainName: env.OSProjectDomainName,
+ ProjectName: env.OSProjectName,
+ }
+ t.Execute(&buf, p)
+
+ body := bytes.NewReader(buf.Bytes())
+ req, err := http.NewRequest("POST", env.OSAuthURL+"/auth/tokens?nocatalog", body)
+ if err != nil {
+ return &token{"", time.Unix(0, 0)}, fmt.Errorf("http request failed: %v", err)
+ }
+ req.Header.Set("Content-Type", "application/json")
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return &token{"", time.Unix(0, 0)}, fmt.Errorf("http POST failed: %v", err)
+ }
+ defer resp.Body.Close()
+ b, err := ioutil.ReadAll(resp.Body)
+
+ tokenStr, ok := resp.Header["X-Subject-Token"]
+ if ok != true && len(tokenStr) != 1 {
+ return &token{"", time.Unix(0, 0)}, fmt.Errorf("no token in openstack reply")
+ }
+
+ var repl tokenReply
+ err = json.Unmarshal(b, &repl)
+
+ return &token{tokenStr[0], repl.Token.ExpiresAt}, nil
+}
+
+type service struct {
+ Description string `json:"description"`
+ Links struct {
+ Self string `json:"self"`
+ } `json:"links"`
+ Enabled bool `json:"enabled"`
+ Type string `json:"type"`
+ ID string `json:"id"`
+ Name string `json:"name"`
+}
+
+type serviceListReply struct {
+ Services []service `json:"services"`
+}
+
+func (s *serviceListReply) GetService(name string) (*service, error) {
+ for _, v := range s.Services {
+ if v.Name == name {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no service id (%s) found", name)
+}
+
+type endPoint struct {
+ RegionID string `json:"region_id"`
+ Links struct {
+ Self string `json:"self"`
+ } `json:"links"`
+ URL string `json:"url"`
+ Region string `json:"region"`
+ Enabled bool `json:"enabled"`
+ Interface string `json:"interface"`
+ ServiceID string `json:"service_id"`
+ ID string `json:"id"`
+}
+
+type endPointReply struct {
+ Endpoints []endPoint `json:"endpoints"`
+}
+
+func (e *endPointReply) GetEndpoint(serviceid string, ifname string) (*endPoint, error) {
+ for _, v := range e.Endpoints {
+ if v.Interface == ifname && v.ServiceID == serviceid {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no endpoint found (%s/%s)", serviceid, ifname)
+}
+
+func getEndpoints(token *token) (endPointReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", env.OSAuthURL+"/endpoints", nil)
+ if err != nil {
+ return endPointReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return endPointReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+ //fmt.Printf("%s", string(b))
+
+ var repl endPointReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return endPointReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ //fmt.Printf("%v", repl)
+ return repl, nil
+}
+
+func getServiceList(token *token) (serviceListReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", env.OSAuthURL+"/services", nil)
+ if err != nil {
+ return serviceListReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return serviceListReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl serviceListReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return serviceListReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ return repl, nil
+}
+
+type neutronPort struct {
+ AllowedAddressPairs []interface{} `json:"allowed_address_pairs"`
+ ExtraDhcpOpts []interface{} `json:"extra_dhcp_opts"`
+ UpdatedAt time.Time `json:"updated_at"`
+ DeviceOwner string `json:"device_owner"`
+ RevisionNumber int `json:"revision_number"`
+ PortSecurityEnabled bool `json:"port_security_enabled"`
+ BindingProfile struct {
+ } `json:"binding:profile"`
+ FixedIps []struct {
+ SubnetID string `json:"subnet_id"`
+ IPAddress string `json:"ip_address"`
+ } `json:"fixed_ips"`
+ ID string `json:"id"`
+ SecurityGroups []interface{} `json:"security_groups"`
+ BindingVifDetails struct {
+ PortFilter bool `json:"port_filter"`
+ DatapathType string `json:"datapath_type"`
+ OvsHybridPlug bool `json:"ovs_hybrid_plug"`
+ } `json:"binding:vif_details"`
+ BindingVifType string `json:"binding:vif_type"`
+ MacAddress string `json:"mac_address"`
+ ProjectID string `json:"project_id"`
+ Status string `json:"status"`
+ BindingHostID string `json:"binding:host_id"`
+ Description string `json:"description"`
+ Tags []interface{} `json:"tags"`
+ QosPolicyID interface{} `json:"qos_policy_id"`
+ Name string `json:"name"`
+ AdminStateUp bool `json:"admin_state_up"`
+ NetworkID string `json:"network_id"`
+ TenantID string `json:"tenant_id"`
+ CreatedAt time.Time `json:"created_at"`
+ BindingVnicType string `json:"binding:vnic_type"`
+ DeviceID string `json:"device_id"`
+}
+
+type neutronPortReply struct {
+ Ports []neutronPort `json:"ports"`
+}
+
+func getNeutronPorts(token *token, endpoint string) (neutronPortReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", endpoint+"/v2.0/ports", nil)
+ if err != nil {
+ return neutronPortReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return neutronPortReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl neutronPortReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return neutronPortReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ return repl, nil
+}
+
+func (n *neutronPortReply) GetNeutronPortfromMAC(mac string) (*neutronPort,
+ error) {
+ for _, v := range n.Ports {
+ if v.MacAddress == strings.ToLower(mac) {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no port (%s) found", mac)
+}
+
+type neutronNetwork struct {
+ ProviderPhysicalNetwork string `json:"provider:physical_network"`
+ Ipv6AddressScope interface{} `json:"ipv6_address_scope"`
+ RevisionNumber int `json:"revision_number"`
+ PortSecurityEnabled bool `json:"port_security_enabled"`
+ Mtu int `json:"mtu"`
+ ID string `json:"id"`
+ RouterExternal bool `json:"router:external"`
+ AvailabilityZoneHints []interface{} `json:"availability_zone_hints"`
+ AvailabilityZones []string `json:"availability_zones"`
+ ProviderSegmentationID interface{} `json:"provider:segmentation_id"`
+ Ipv4AddressScope interface{} `json:"ipv4_address_scope"`
+ Shared bool `json:"shared"`
+ ProjectID string `json:"project_id"`
+ Status string `json:"status"`
+ Subnets []string `json:"subnets"`
+ Description string `json:"description"`
+ Tags []interface{} `json:"tags"`
+ UpdatedAt time.Time `json:"updated_at"`
+ IsDefault bool `json:"is_default"`
+ QosPolicyID interface{} `json:"qos_policy_id"`
+ Name string `json:"name"`
+ AdminStateUp bool `json:"admin_state_up"`
+ TenantID string `json:"tenant_id"`
+ CreatedAt time.Time `json:"created_at"`
+ ProviderNetworkType string `json:"provider:network_type"`
+}
+
+type neutronNetworkReply struct {
+ Networks []neutronNetwork `json:"networks"`
+}
+
+func (n *neutronNetworkReply) GetNetworkFromID(netid string) (*neutronNetwork, error) {
+ for _, v := range n.Networks {
+ if v.ID == netid {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no network (%s) found", netid)
+}
+
+func getNetworkReply(token *token, endpoint string) (neutronNetworkReply, error) {
+ token.CheckToken()
+ req, err := http.NewRequest("GET", endpoint+"/v2.0/networks", nil)
+ if err != nil {
+ return neutronNetworkReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return neutronNetworkReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl neutronNetworkReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return neutronNetworkReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+ return repl, nil
+}
+
+type novaCompute struct {
+ ID string `json:"id"`
+ Links []struct {
+ Href string `json:"href"`
+ Rel string `json:"rel"`
+ } `json:"links"`
+ Name string `json:"name"`
+}
+
+type novaComputeReply struct {
+ Servers []novaCompute `json:"servers"`
+}
+
+func (n *novaComputeReply) GetComputeFromID(vmid string) (*novaCompute, error) {
+ for _, v := range n.Servers {
+ if v.ID == vmid {
+ return &v, nil
+ }
+ }
+ return nil, fmt.Errorf("no vm (%s) found", vmid)
+}
+
+func getComputeReply(token *token, endpoint string) (novaComputeReply, error) {
+ token.CheckToken()
+ values := url.Values{}
+ values.Add("all_tenants", "1")
+
+ req, err := http.NewRequest("GET", endpoint+"/servers", nil)
+ if err != nil {
+ return novaComputeReply{}, fmt.Errorf("request failed:%v", err)
+ }
+ req.Header.Set("X-Auth-Token", token.Token)
+ req.URL.RawQuery = values.Encode()
+
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return novaComputeReply{}, fmt.Errorf("http GET failed:%v", err)
+ }
+ defer resp.Body.Close()
+
+ b, err := ioutil.ReadAll(resp.Body)
+
+ var repl novaComputeReply
+ err = json.Unmarshal(b, &repl)
+ if err != nil {
+ return novaComputeReply{}, fmt.Errorf("http reply decoding failed:%v", err)
+ }
+
+ return repl, nil
+}
+
+type osNeutronInterfaceAnnotation struct {
+ IfName string
+ VMName string
+ NetworkName string
+}
+
+// RunNeutronInfoFetch gets redis key update from libvirt and get network information
+// from Neutron with REST api. The retrieved information is stored under redis,
+// if/<tap name>/neutron_network
+func RunNeutronInfoFetch(ctx context.Context, config *Config, vmIfInfo chan string) error {
+ env = &config.InfoFetch
+ token, err := getToken()
+
+ if err != nil {
+ fmt.Fprintf(os.Stderr, "cannot get token: %v", err)
+ return err
+ }
+
+ svc, _ := getServiceList(token)
+ neuID, _ := svc.GetService("neutron")
+ //fmt.Printf("neutron id:%s\n", id.ID)
+
+ novaID, _ := svc.GetService("nova")
+ //fmt.Printf("nova id:%s\n", id.ID)
+
+ endpoints, _ := getEndpoints(token)
+ neuEndpoint, _ := endpoints.GetEndpoint(neuID.ID, "admin")
+ //fmt.Printf("neutron endpoint:%s\n", neuEndpoint.URL)
+
+ novaEndpoint, _ := endpoints.GetEndpoint(novaID.ID, "admin")
+ //fmt.Printf("nova endpoint:%s\n", novaEndpoint.URL)
+
+ getComputeReply(token, novaEndpoint.URL)
+ getNeutronPorts(token, neuEndpoint.URL)
+ //vmrepl, _ := getComputeReply(token, novaEndpoint.URL)
+ //prepl, _ := getNeutronPorts(token, neuEndpoint.URL)
+
+EVENTLOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ break EVENTLOOP
+ case key := <-vmIfInfo:
+ log.Printf("Incoming IF: %v", key)
+ libvirtIfInfo, err := infoPool.Get(key)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ var ifInfo osVMInterfaceAnnotation
+ err = json.Unmarshal([]byte(libvirtIfInfo), &ifInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ vmrepl, _ := getComputeReply(token, novaEndpoint.URL)
+ prepl, _ := getNeutronPorts(token, neuEndpoint.URL)
+ nrepl, _ := getNetworkReply(token, neuEndpoint.URL)
+ netid, _ := prepl.GetNeutronPortfromMAC(ifInfo.MacAddr)
+ net, _ := nrepl.GetNetworkFromID(netid.NetworkID)
+ vm, _ := vmrepl.GetComputeFromID(netid.DeviceID)
+ osIfInfo := osNeutronInterfaceAnnotation{
+ IfName: ifInfo.Target,
+ VMName: vm.Name,
+ NetworkName: net.Name}
+
+ osIfInfoJSON, err := json.Marshal(osIfInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ log.Printf("Get: vmname: %s / networkname:%s", vm.Name, net.Name)
+ infoPool.Set(fmt.Sprintf("if/%s/%s", ifInfo.Target, "neutron_network"), string(osIfInfoJSON))
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/src/dma/cmd/infofetch/virsh_domain.go b/src/dma/cmd/infofetch/virsh_domain.go
new file mode 100644
index 00000000..b79f5bdd
--- /dev/null
+++ b/src/dma/cmd/infofetch/virsh_domain.go
@@ -0,0 +1,264 @@
+/*
+ * Copyright 2017 Red Hat
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "encoding/xml"
+ "fmt"
+ libvirt "github.com/libvirt/libvirt-go"
+ "log"
+)
+
+type instance struct {
+ Name string `xml:"name"`
+ Owner struct {
+ User string `xml:"user"`
+ Project string `xml:"project"`
+ } `xml:"owner"`
+ Flavor struct {
+ Name string `xml:"name,attr"`
+ } `xml:"flavor"`
+}
+
+type domain struct {
+ Name string `xml:"name"`
+ Devices struct {
+ Interfaces []struct {
+ Type string `xml:"type,attr"`
+ Mac struct {
+ Address string `xml:"address,attr"`
+ } `xml:"mac"`
+ Target struct {
+ Dev string `xml:"dev,attr"`
+ } `xml:"target"`
+ } `xml:"interface"`
+ } `xml:"devices"`
+}
+
+type osVMAnnotation struct {
+ Name string
+ Owner string
+ Project string
+ Flavor string
+}
+
+type osVMInterfaceAnnotation struct {
+ Type string
+ MacAddr string
+ Target string
+ VMName string
+}
+
+func parseNovaMetadata(metadata string) (*osVMAnnotation, error) {
+ data := new(instance)
+
+ if err := xml.Unmarshal([]byte(metadata), data); err != nil {
+ log.Println("XML Unmarshal error:", err)
+ return nil, err
+ }
+ log.Printf("Get name: %s user: %s, project: %s, flavor: %s", data.Name, data.Owner.User, data.Owner.Project, data.Flavor.Name)
+ return &osVMAnnotation{
+ Name: data.Name,
+ Owner: data.Owner.User,
+ Project: data.Owner.Project,
+ Flavor: data.Flavor.Name}, nil
+}
+
+func parseXMLForMAC(dumpxml string) (*[]osVMInterfaceAnnotation, error) {
+ data := new(domain)
+
+ if err := xml.Unmarshal([]byte(dumpxml), data); err != nil {
+ log.Println("XML Unmarshal error:", err)
+ return nil, err
+ }
+
+ ifAnnotation := make([]osVMInterfaceAnnotation, len(data.Devices.Interfaces))
+ for i, v := range data.Devices.Interfaces {
+ log.Printf("Interface type: %s, mac_addr: %s, target_dev: %s", v.Type, v.Mac.Address, v.Target.Dev)
+ ifAnnotation[i] = osVMInterfaceAnnotation{
+ Type: v.Type,
+ MacAddr: v.Mac.Address,
+ Target: v.Target.Dev,
+ VMName: data.Name}
+ }
+ return &ifAnnotation, nil
+}
+
+func setInterfaceAnnotation(ifInfo *[]osVMInterfaceAnnotation, vmIfInfoChan chan string) {
+ for _, v := range *ifInfo {
+ ifInfoJSON, err := json.Marshal(v)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ infoPool.Set(fmt.Sprintf("if/%s/%s", v.Target, "network"), string(ifInfoJSON))
+
+ vmIfInfoChan <- fmt.Sprintf("if/%s/%s", v.Target, "network")
+ }
+ return
+}
+
+func domainEventLifecycleCallback(vmIfInfo chan string) func(c *libvirt.Connect, d *libvirt.Domain, event *libvirt.DomainEventLifecycle) {
+
+ return func(c *libvirt.Connect,
+ d *libvirt.Domain, event *libvirt.DomainEventLifecycle) {
+ domName, _ := d.GetName()
+
+ switch event.Event {
+ case libvirt.DOMAIN_EVENT_DEFINED:
+ // VM defined: vmname (libvirt, nova), user, project, flavor
+ // Redis: <vnname>/vminfo
+ log.Printf("Event defined: domName: %s, event: %v", domName, event)
+ metadata, err := d.GetMetadata(libvirt.DOMAIN_METADATA_ELEMENT, "http://openstack.org/xmlns/libvirt/nova/1.0", libvirt.DOMAIN_AFFECT_CONFIG)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ vmInfo, err := parseNovaMetadata(metadata)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ vmInfoJSON, err := json.Marshal(vmInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", domName, "vminfo"), string(vmInfoJSON))
+ case libvirt.DOMAIN_EVENT_STARTED:
+ // VM started: interface type, interface mac addr, intarface type
+ // Redis: <vnname>/interfaces
+ log.Printf("Event started: domName: %s, event: %v", domName, event)
+
+ xml, err := d.GetXMLDesc(0)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ ifInfo, err := parseXMLForMAC(xml)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ setInterfaceAnnotation(ifInfo, vmIfInfo)
+
+ ifInfoJSON, err := json.Marshal(ifInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", domName, "interfaces"), string(ifInfoJSON))
+ case libvirt.DOMAIN_EVENT_UNDEFINED:
+ log.Printf("Event undefined: domName: %s, event: %v", domName, event)
+ vmIFInfo, err := infoPool.Get(fmt.Sprintf("vm/%s/%s", domName, "interfaces"))
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ var interfaces []osVMInterfaceAnnotation
+ err = json.Unmarshal([]byte(vmIFInfo), &interfaces)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ } else {
+ for _, v := range interfaces {
+ infoPool.Del(fmt.Sprintf("if/%s/%s", v.Target, "network"))
+ infoPool.Del(fmt.Sprintf("if/%s/%s", v.Target, "neutron_network"))
+ }
+ }
+ }
+ infoPool.Del(fmt.Sprintf("vm/%s/%s", domName, "vminfo"))
+ infoPool.Del(fmt.Sprintf("vm/%s/%s", domName, "interfaces"))
+ default:
+ log.Printf("Event misc: domName: %s, event: %v", domName, event)
+ }
+ }
+}
+
+// GetActiveDomain gets all active domain information from libvirt and it should be called at startup to get
+// current running domain information
+func GetActiveDomain(conn *libvirt.Connect, vmIfInfoChan chan string) error {
+ doms, err := conn.ListAllDomains(libvirt.CONNECT_LIST_DOMAINS_ACTIVE)
+ if err != nil {
+ log.Fatalf("libvirt dom list error: %s", err)
+ return err
+ }
+
+ for _, d := range doms {
+ name, err := d.GetName()
+
+ // Get VM Info
+ metadata, err := d.GetMetadata(libvirt.DOMAIN_METADATA_ELEMENT, "http://openstack.org/xmlns/libvirt/nova/1.0", libvirt.DOMAIN_AFFECT_CONFIG)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ vmInfo, err := parseNovaMetadata(metadata)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ vmInfoJSON, err := json.Marshal(vmInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", name, "vminfo"), string(vmInfoJSON))
+
+ // Get Network info
+ xml, err := d.GetXMLDesc(0)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ ifInfo, err := parseXMLForMAC(xml)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ setInterfaceAnnotation(ifInfo, vmIfInfoChan)
+
+ ifInfoJSON, err := json.Marshal(ifInfo)
+ if err != nil {
+ log.Fatalf("Err: %v", err)
+ return err
+ }
+ infoPool.Set(fmt.Sprintf("vm/%s/%s", name, "interfaces"), string(ifInfoJSON))
+ }
+ return nil
+}
+
+// RunVirshEventLoop is event loop to watch libvirt update
+func RunVirshEventLoop(ctx context.Context, conn *libvirt.Connect, vmIfInfoChan chan string) error {
+ callbackID, err := conn.DomainEventLifecycleRegister(nil, domainEventLifecycleCallback(vmIfInfoChan))
+ if err != nil {
+ log.Fatalf("Err: callbackid: %d %v", callbackID, err)
+ }
+
+ libvirt.EventAddTimeout(5000, func(timer int) { return }) // 5000 = 5sec
+ log.Printf("Entering libvirt event loop()")
+EVENTLOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ break EVENTLOOP
+ default:
+ if err := libvirt.EventRunDefaultImpl(); err != nil {
+ log.Fatalf("%v", err)
+ }
+ }
+ }
+ log.Printf("Quitting libvirt event loop()")
+
+ if err := conn.DomainEventDeregister(callbackID); err != nil {
+ log.Fatalf("%v", err)
+ }
+ return nil
+}