aboutsummaryrefslogtreecommitdiffstats
path: root/src/dma/cmd/server/amqp.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/dma/cmd/server/amqp.go')
-rw-r--r--src/dma/cmd/server/amqp.go108
1 files changed, 108 insertions, 0 deletions
diff --git a/src/dma/cmd/server/amqp.go b/src/dma/cmd/server/amqp.go
new file mode 100644
index 00000000..4d080fe1
--- /dev/null
+++ b/src/dma/cmd/server/amqp.go
@@ -0,0 +1,108 @@
+/*
+ * Copyright 2017 NEC Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package main
+
+import (
+ "context"
+ "github.com/streadway/amqp"
+ "log"
+ "os"
+ "strings"
+)
+
+func failOnError(err error, msg string) {
+ if err != nil {
+ log.Fatalf("%s: %s", msg, err)
+ }
+}
+
+func runSubscriber(ctx context.Context, config *Config) {
+ confDirPath := config.Server.CollectdConfDir
+ amqpURL := "amqp://" + config.Server.AmqpUser + ":" + config.Server.AmqpPassword + "@" + config.Server.AmqpHost + ":" + config.Server.AmqpPort + "/"
+ conn, err := amqp.Dial(amqpURL)
+ failOnError(err, "Failed to connect to RabbitMQ")
+
+ defer conn.Close()
+
+ ch, err := conn.Channel()
+ failOnError(err, "Failed to open a channel")
+ defer ch.Close()
+
+ err = ch.ExchangeDeclare(
+ "collectd-conf", // name
+ "fanout", // type
+ false, // durable
+ false, // auto-deleted
+ false, // internal
+ false, // no-wait
+ nil, // arguments
+ )
+ failOnError(err, "Failed to declare an exchange")
+
+ q, err := ch.QueueDeclare(
+ "", // name
+ false, // durable
+ false, // delete when unused
+ true, // exclusive
+ false, // no-wait
+ nil, // arguments
+ )
+ failOnError(err, "Failed to declare a queue")
+
+ err = ch.QueueBind(
+ q.Name, // queue name
+ "", // routing key
+ "collectd-conf", // exchange
+ false,
+ nil)
+ failOnError(err, "Failed to bind a queue")
+
+ msgs, err := ch.Consume(
+ q.Name, // queue
+ "", // consumer
+ true, // auto-ack
+ false, // exclusive
+ false, // no-local
+ false, // no-wait
+ nil, // args
+ )
+ failOnError(err, "Failed to register a consumer")
+
+EVENTLOOP:
+ for {
+ select {
+ case <-ctx.Done():
+ break EVENTLOOP
+ case d, ok := <-msgs:
+ if ok {
+ dataText := strings.SplitN(string(d.Body), "/", 2)
+
+ dst, err := os.Create(confDirPath + "/" + dataText[0])
+ failOnError(err, "File create NG")
+ defer dst.Close()
+
+ dst.Write(([]byte)(dataText[1]))
+
+ err = createCollectdConf()
+ failOnError(err, "collectd conf NG")
+
+ log.Printf(" [x] %s", d.Body)
+ }
+ }
+ }
+
+}