package config
import (
"errors"
viperLib "github.com/spf13/viper"
)
type Server struct {
User string
Password string
Host string
Port int
}
type Queue struct {
Name string
}
type Config struct {
Server Server
Wrappers []Queue
Status string
Storage string
JobManager Queue
WrapperOutput Queue
}
func ReadConfig() (Config, error) {
var configFileLocation string
var config Config
var envVariable string = "MUSIC_MANAGER_SERVICE_CONFIG_FILE_LOCATION"
serverVariables := []string{"host", "port", "user", "password"}
queueVariables := []string{"name"}
requiredConfigEntities := []string{"wrappers", "status", "storage", "jobmanager", "wrapperoutput"}
viper := viperLib.New()
//Look for config file location defined as env var
viper.BindEnv(envVariable)
configFileLocation = viper.GetString(envVariable)
if configFileLocation == "" {
// Get config file from default location
configFileLocation = "/etc/music-manager/"
}
viper.SetConfigName("config")
viper.SetConfigType("toml")
viper.AddConfigPath(configFileLocation)
if err := viper.ReadInConfig(); err != nil {
return config, errors.New(errors.New("Fatal error reading config file: ").Error() + err.Error())
}
for _, server_variable := range serverVariables {
if !viper.IsSet("server." + server_variable) {
return config, errors.New("Fatal error reading config: no server " + server_variable + " was found.")
}
}
server := Server{User: viper.GetString("server.user"), Password: viper.GetString("server.password"), Host: viper.GetString("server.host"), Port: viper.GetInt("server.port")}
config.Server = server
for _, requiredConfigEntity := range requiredConfigEntities {
if !viper.IsSet(requiredConfigEntity) {
return config, errors.New("Fatal error reading config: no " + requiredConfigEntity + " config was found.")
}
}
// Check Wrappers
wrapperConfig := viper.Get("wrappers")
wrapperConfigElementsMap := wrapperConfig.(map[string]interface{})
if len(wrapperConfigElementsMap) == 0 {
return config, errors.New("Fatal error reading config: no wrappers were found, at least one wrapper must be defined.")
}
for wrapperName, _ := range wrapperConfigElementsMap {
for _, requiredQueueVeriable := range queueVariables {
if !viper.IsSet("wrappers." + wrapperName + "." + requiredQueueVeriable) {
return config, errors.New("Fatal error reading config: wrapper " + wrapperName + " has an invalid config: " + requiredQueueVeriable + " is not defined.")
}
}
wrapperConfig := Queue{Name: viper.GetString("wrappers." + wrapperName + ".name")}
config.Wrappers = append(config.Wrappers, wrapperConfig)
}
// Check JobManager
for _, requiredQueueVeriable := range queueVariables {
if !viper.IsSet("jobmanager." + requiredQueueVeriable) {
return config, errors.New("Fatal error reading config: jobmanager has an invalid config: " + requiredQueueVeriable + " is not defined.")
}
}
jobmanagerConfig := Queue{Name: viper.GetString("jobmanager.name")}
config.JobManager = jobmanagerConfig
// Check WrapperOutput
for _, requiredQueueVeriable := range queueVariables {
if !viper.IsSet("wrapperoutput." + requiredQueueVeriable) {
return config, errors.New("Fatal error reading config: wrapperoutput has an invalid config: " + requiredQueueVeriable + " is not defined.")
}
}
wrapperoutputConfig := Queue{Name: viper.GetString("wrapperoutput.name")}
config.WrapperOutput = wrapperoutputConfig
// Check Status
if !viper.IsSet("status.name") {
return config, errors.New("Fatal error reading config: status has an invalid config: name is not defined.")
}
config.Status = viper.GetString("status.name")
// Check Storage
if !viper.IsSet("storage.name") {
return config, errors.New("Fatal error reading config: storage has an invalid config: name is not defined.")
}
config.Storage = viper.GetString("storage.name")
return config, nil
}
package manager
import (
"errors"
"fmt"
"strconv"
commontypes "github.com/a-castellano/music-manager-common-types/types"
"github.com/a-castellano/music-manager-job-router/config"
"github.com/streadway/amqp"
)
func ReadJobManagerJobs(config config.Config, wrapperChannel chan commontypes.Job) error {
connection_string := "amqp://" + config.Server.User + ":" + config.Server.Password + "@" + config.Server.Host + ":" + strconv.Itoa(config.Server.Port) + "/"
conn, err := amqp.Dial(connection_string)
if err != nil {
return fmt.Errorf("Failed to stablish connection with RabbitMQ: %w", err)
}
defer conn.Close()
jobmanager_ch, err := conn.Channel()
defer jobmanager_ch.Close()
if err != nil {
return fmt.Errorf("Failed to open jobmanager RabbitMQ channel: %w", err)
}
jobmanager_q, err := jobmanager_ch.QueueDeclare(
config.JobManager.Name,
true, // Durable
false, // DeleteWhenUnused
false, // Exclusive
false, // NoWait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare jobmanager queue: %w", err)
}
err = jobmanager_ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
return fmt.Errorf("Failed to set jobmanager QoS: %w", err)
}
jobsToProcess, err := jobmanager_ch.Consume(
jobmanager_q.Name,
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("Failed to register a consumer: %w", err)
}
processJobs := make(chan bool)
go func() {
for job := range jobsToProcess {
die := false
jobToProcess, decodeJobErr := commontypes.DecodeJob(job.Body)
if decodeJobErr != nil {
err = errors.New("Empty job data received.")
} else {
if jobToProcess.Type == commontypes.Die {
die = true
} else {
// This function reads meesages from jobManager
if jobToProcess.LastOrigin != "JobManager" {
jobToProcess.Error = "LastOrigin can only be 'JobManager'"
jobToProcess.Status = false
job.Ack(false)
processJobs <- true
wrapperChannel <- jobToProcess
} else {
jobToProcess.LastOrigin = "JobRouter"
processJobs <- true
wrapperChannel <- jobToProcess
}
}
}
if die {
job.Ack(false)
processJobs <- false
for _, wrapper := range config.Wrappers {
jobToWrapper := jobToProcess
jobToWrapper.RequiredOrigin = wrapper.Name
wrapperChannel <- jobToWrapper
}
// Kill RouteJobs Function
jobToWrapperSender := jobToProcess
jobToWrapperSender.LastOrigin = "JobRouter"
jobToWrapperSender.RequiredOrigin = "JobRouter"
wrapperChannel <- jobToWrapperSender
return
}
}
return
}()
<-processJobs
return nil
}
package status
import (
"encoding/json"
"errors"
"net/http"
commontypes "github.com/a-castellano/music-manager-common-types/types"
"bytes"
)
func UpdateJobStatus(client http.Client, statusService string, job commontypes.Job) error {
jsonJob, _ := json.Marshal(job)
url := "http://" + statusService
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonJob))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// We must include a reason for that error
return errors.New("Failed to update status.")
}
return nil
}
package storage
import (
"encoding/json"
"errors"
"net/http"
commontypes "github.com/a-castellano/music-manager-common-types/types"
"bytes"
)
func SendInfoToStorageManager(client http.Client, storageService string, job commontypes.Job) error {
jsonJob, _ := json.Marshal(job)
url := "http://" + storageService
resp, err := client.Post(url, "application/json", bytes.NewBuffer(jsonJob))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// We must include a reason for that error
return errors.New("Failed to update status.")
}
return nil
}
package wrappers
import (
"fmt"
"net/http"
"strconv"
commontypes "github.com/a-castellano/music-manager-common-types/types"
"github.com/a-castellano/music-manager-job-router/config"
"github.com/a-castellano/music-manager-job-router/status"
"github.com/a-castellano/music-manager-job-router/storage"
"github.com/streadway/amqp"
)
func RouteJobs(config config.Config, wrapperChannel chan commontypes.Job, client http.Client) error {
wrapperQueues := make(map[string]amqp.Queue)
wrapperQueuesPosition := make(map[string]int)
var wrapperOrder []string
var wrapperCounter int = 0
connection_string := "amqp://" + config.Server.User + ":" + config.Server.Password + "@" + config.Server.Host + ":" + strconv.Itoa(config.Server.Port) + "/"
conn, err := amqp.Dial(connection_string)
if err != nil {
return fmt.Errorf("Failed to stablish connection with RabbitMQ: %w", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return fmt.Errorf("Failed to open a channel in RouteJobs: %w", err)
}
defer ch.Close()
for _, wrapper := range config.Wrappers {
wrapperQueue, err := ch.QueueDeclare(
wrapper.Name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare queue %s in RouteJobs: %w", wrapper.Name, err)
}
wrapperQueues[wrapper.Name] = wrapperQueue
wrapperQueuesPosition[wrapper.Name] = wrapperCounter
wrapperOrder = append(wrapperOrder, wrapper.Name)
wrapperCounter++
}
for {
jobToRoute := <-wrapperChannel
encodedJob, _ := commontypes.EncodeJob(jobToRoute)
if jobToRoute.LastOrigin == "JobManager" {
if jobToRoute.RequiredOrigin == "" {
// Send to first wrapper
err = ch.Publish(
"", // exchange
wrapperOrder[0], // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: encodedJob,
})
if err != nil {
return fmt.Errorf("Failed to send job to qeue %s in RouteJobs: %w", wrapperOrder[0], err)
}
} else {
// check if required origin exists
if _, ok := wrapperQueues[jobToRoute.RequiredOrigin]; ok {
err = ch.Publish(
"", // exchange
jobToRoute.RequiredOrigin, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: encodedJob,
})
if err != nil {
return fmt.Errorf("Failed to send job to qeue %s in RouteJobs: %w", jobToRoute.RequiredOrigin, err)
}
} else {
return fmt.Errorf("Wrapper '%s' does not exist.", jobToRoute.RequiredOrigin)
}
}
} else {
// Job has already been proccesed by another of Die signal has been sent
if jobToRoute.Status == false {
//Job failed - check if there are wrappers left to process this job
nextPosition := wrapperQueuesPosition[jobToRoute.LastOrigin] + 1
if jobToRoute.RequiredOrigin == "" && nextPosition < len(wrapperOrder) {
// Send job to next wrapper
nextWrapper := wrapperOrder[nextPosition]
err = ch.Publish(
"", // exchange
nextWrapper, // routing key
false, // mandatory
false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: encodedJob,
})
if err != nil {
return fmt.Errorf("Failed to send job to qeue %s in RouteJobs: %w", nextWrapper, err)
}
} else {
// No more wrappers left, job is marked as failed
jobToRoute.Finished = true
err = status.UpdateJobStatus(client, config.Status, jobToRoute)
if err != nil {
return fmt.Errorf("Failed to send job to status Manager in RouteJobs: %w", err)
}
}
} else {
// jobFinished or is a Die function
if jobToRoute.RequiredOrigin == "JobRouter" {
if jobToRoute.Type == commontypes.Die {
break
} else {
return fmt.Errorf("Only JobType allowed when RequiredOrigin is JobRouter is Die.")
}
}
jobToRoute.Finished = true
err = status.UpdateJobStatus(client, config.Status, jobToRoute)
if err != nil {
return fmt.Errorf("Failed to send job to status Manager in RouteJobs: %w", err)
}
err = storage.SendInfoToStorageManager(client, config.Storage, jobToRoute)
if err != nil {
return fmt.Errorf("Failed to send job to status Manager in RouteJobs: %w", err)
}
}
}
}
return nil
}