diff options
author | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-06 09:04:29 +0000 |
---|---|---|
committer | Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp> | 2018-09-07 06:03:01 +0000 |
commit | d61931341176dad9ccff7c967a10d88fe54218fa (patch) | |
tree | 526457882d4abe0c38d2242d6daa311bf8ef51cf /src/dma/cmd/server | |
parent | 73abc060f31a6bf866fa1dad0a1a6efdfd94d775 (diff) |
src: Add DMA localagent
Change-Id: Ibcee814fbc9a904448eeb368a1a26bbb69cf54aa
Signed-off-by: Toshiaki Takahashi <takahashi.tsc@ncos.nec.co.jp>
Diffstat (limited to 'src/dma/cmd/server')
-rw-r--r-- | src/dma/cmd/server/agent.go | 31 | ||||
-rw-r--r-- | src/dma/cmd/server/amqp.go | 108 | ||||
-rw-r--r-- | src/dma/cmd/server/api.go | 85 | ||||
-rw-r--r-- | src/dma/cmd/server/main.go | 85 |
4 files changed, 309 insertions, 0 deletions
diff --git a/src/dma/cmd/server/agent.go b/src/dma/cmd/server/agent.go new file mode 100644 index 00000000..ffcb4a97 --- /dev/null +++ b/src/dma/cmd/server/agent.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "os/exec" + "strings" +) + +func createCollectdConf() error { + outStatus, errStatus := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "status", "collectd").Output() + if errStatus != nil { + return fmt.Errorf("status NG") + } + if !strings.Contains(string(outStatus), "running") { + return fmt.Errorf("status not running") + } + + _, errStop := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "stop", "collectd").Output() + if errStop != nil { + return fmt.Errorf("stop NG") + } + + _, errStart := exec.Command("ssh", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "localhost", "sudo", "systemctl", "start", "collectd").Output() + if errStart != nil { + return fmt.Errorf("start NG") + } + + fmt.Println("All complete!") + + return nil +} diff --git a/src/dma/cmd/server/amqp.go b/src/dma/cmd/server/amqp.go new file mode 100644 index 00000000..4d080fe1 --- /dev/null +++ b/src/dma/cmd/server/amqp.go @@ -0,0 +1,108 @@ +/* + * Copyright 2017 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "github.com/streadway/amqp" + "log" + "os" + "strings" +) + +func failOnError(err error, msg string) { + if err != nil { + log.Fatalf("%s: %s", msg, err) + } +} + +func runSubscriber(ctx context.Context, config *Config) { + confDirPath := config.Server.CollectdConfDir + amqpURL := "amqp://" + config.Server.AmqpUser + ":" + config.Server.AmqpPassword + "@" + config.Server.AmqpHost + ":" + config.Server.AmqpPort + "/" + conn, err := amqp.Dial(amqpURL) + failOnError(err, "Failed to connect to RabbitMQ") + + defer conn.Close() + + ch, err := conn.Channel() + failOnError(err, "Failed to open a channel") + defer ch.Close() + + err = ch.ExchangeDeclare( + "collectd-conf", // name + "fanout", // type + false, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare an exchange") + + q, err := ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + failOnError(err, "Failed to declare a queue") + + err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "collectd-conf", // exchange + false, + nil) + failOnError(err, "Failed to bind a queue") + + msgs, err := ch.Consume( + q.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + failOnError(err, "Failed to register a consumer") + +EVENTLOOP: + for { + select { + case <-ctx.Done(): + break EVENTLOOP + case d, ok := <-msgs: + if ok { + dataText := strings.SplitN(string(d.Body), "/", 2) + + dst, err := os.Create(confDirPath + "/" + dataText[0]) + failOnError(err, "File create NG") + defer dst.Close() + + dst.Write(([]byte)(dataText[1])) + + err = createCollectdConf() + failOnError(err, "collectd conf NG") + + log.Printf(" [x] %s", d.Body) + } + } + } + +} diff --git a/src/dma/cmd/server/api.go b/src/dma/cmd/server/api.go new file mode 100644 index 00000000..e8add0a1 --- /dev/null +++ b/src/dma/cmd/server/api.go @@ -0,0 +1,85 @@ +/* + * Copyright 2017 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "github.com/labstack/echo" + "io" + "net/http" + "os" + "time" +) + +func runAPIServer(ctx context.Context, config *Config) { + confDirPath := config.Server.CollectdConfDir + e := echo.New() + + e.GET("/", func(c echo.Context) error { + return c.String(http.StatusOK, "GET OK") + }) + e.POST("/collectd/conf", func(c echo.Context) error { + + file, err := c.FormFile("file") + if err != nil { + return c.String(http.StatusInternalServerError, "file send NG") + } + + src, err := file.Open() + if err != nil { + return c.String(http.StatusInternalServerError, "file open NG") + } + defer src.Close() + + dst, err := os.Create(confDirPath + "/" + file.Filename) + if err != nil { + return c.String(http.StatusInternalServerError, "file create NG") + } + defer dst.Close() + + // Copy + if _, err = io.Copy(dst, src); err != nil { + return c.String(http.StatusInternalServerError, "file write NG") + } + + err = createCollectdConf() + if err != nil { + errstr := fmt.Sprintf("collectd conf NG:%v", err) + return c.String(http.StatusInternalServerError, errstr) + } + return c.String(http.StatusCreated, "collectd conf OK") + }) + + // Start server + go func() { + urlStr := ":" + config.Server.ListenPort + if err := e.Start(urlStr); err != nil { + e.Logger.Info("shutting down the server") + } + }() + + // Wait for context.Done() to gracefully shutdown the server with + // a timeout of 10 seconds. + <-ctx.Done() + ctxShutdown, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := e.Shutdown(ctxShutdown); err != nil { + e.Logger.Fatal(err) + } + +} diff --git a/src/dma/cmd/server/main.go b/src/dma/cmd/server/main.go new file mode 100644 index 00000000..2e028fa4 --- /dev/null +++ b/src/dma/cmd/server/main.go @@ -0,0 +1,85 @@ +/* + * Copyright 2017 NEC Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "flag" + "github.com/BurntSushi/toml" + "log" + "os" + "sync" +) + +var serverTypeOpt = flag.String("type", "both", "server type: both(default), pubsub, rest") + +// Config is ... +type Config struct { + Server ServerConfig +} + +// ServerConfig is ... +type ServerConfig struct { + ListenPort string `toml:"listen_port"` + + AmqpHost string `toml:"amqp_host"` + AmqpUser string `toml:"amqp_user"` + AmqpPassword string `toml:"amqp_password"` + AmqpPort string `toml:"amqp_port"` + + CollectdConfDir string `toml:"collectd_confdir"` +} + +func main() { + + var config Config + _, err := toml.DecodeFile("/etc/barometer-dma/config.toml", &config) + if err != nil { + log.Fatalf("Read error of config file") + } + + if f, err := os.Stat(config.Server.CollectdConfDir); os.IsNotExist(err) || !f.IsDir() { + log.Fatalf("Path \"%s\" is not a directory", config.Server.CollectdConfDir) + } + + var waitgroup sync.WaitGroup + + flag.Parse() + + if *serverTypeOpt == "both" || *serverTypeOpt == "pubsub" { + ctx := context.Background() + waitgroup.Add(1) + go func() { + defer waitgroup.Done() + runSubscriber(ctx, &config) + }() + log.Printf("Waiting for publish.") + } + + if *serverTypeOpt == "both" || *serverTypeOpt == "rest" { + ctx := context.Background() + waitgroup.Add(1) + go func() { + defer waitgroup.Done() + runAPIServer(ctx, &config) + }() + log.Printf("Waiting for REST.") + } + + waitgroup.Wait() + log.Printf("Server stop.") +} |