Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,20 @@ The middle layer of the control system uses gRPCs for communications. The GraphQ

### Prerequisites

- [Rust >= 1.87](https://www.rust-lang.org/learn/get-started)
- [Rust >= 1.90](https://www.rust-lang.org/learn/get-started)
- [Protocol Buffer](https://grpc.io/docs/protoc-installation/)

### Environment variables
The following variables exist for configuring the service at runtime:
- `ALARMS_KAFKA_TOPIC` -> Topic name for alarms in Kafka
- `CLOCK_GRPC_HOST` -> Hostname for the clock gRPC service
- `DEVDB_GRPC_HOST` -> Hostname for the DevDB gRPC service
- `DPM_GRPC_HOST` -> Hostname for the DPM gRPC service
- `GRAPHQL_PORT` -> Port for clients to connect via GraphQL to this service
- `KAFKA_HOST` -> Hostname for the Controls Kafka instance
- `SCANNER_GRPC_HOST` -> Hostname for the wire scanner gRPC service
- `TLG_GRPC_HOST` -> Hostname for the TLG gRPC service


### Check out the project:

Expand Down
38 changes: 38 additions & 0 deletions src/env_var/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::env::{self, VarError};
use tracing::{error, warn};

pub struct EnvVal {
var_name: String,
result: Result<String, VarError>,
}
impl EnvVal {
pub fn into_str_or(self, default: &str) -> String {
self.result.unwrap_or_else(|err| {
warn!("{}. Using default: {}", err, default);
default.to_string()
})
}

pub fn into_u16_or(self, default: u16) -> u16 {
match self.result {
Ok(val) => match val.parse::<u16>() {
Ok(parsed) => parsed,
Err(err) => {
error!("Could not read the value for {}. {}. Using default: {}", self.var_name, err, default);
default
}
},
Err(err) => {
warn!("{}. Using default: {}", err, default);
default
}
}
}
}

pub fn get(var: &str) -> EnvVal {
EnvVal {
var_name: var.to_owned(),
result: env::var(var),
}
}
8 changes: 7 additions & 1 deletion src/g_rpc/clock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ use crate::g_rpc::proto::services::aclk::{
clock_event_client::ClockEventClient, EventInfo, SubscribeReq,
};

use crate::env_var;

const CLOCK_HOST: &str = "CLOCK_GRPC_HOST";
const DEFAULT_CLOCK_HOST: &str = "http://clx76.fnal.gov:6803";

pub async fn subscribe(
events: &[i32],
) -> Result<tonic::Response<tonic::Streaming<EventInfo>>, tonic::Status> {
match ClockEventClient::connect("http://clx76.fnal.gov:6803/").await {
let host = env_var::get(CLOCK_HOST).into_str_or(DEFAULT_CLOCK_HOST);
match ClockEventClient::connect(host).await {
Ok(mut client) => {
let req = SubscribeReq {
events: events.to_vec(),
Expand Down
8 changes: 7 additions & 1 deletion src/g_rpc/devdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ pub mod proto {
tonic::include_proto!("devdb");
}

use crate::env_var;

const DEVDB_HOST: &str = "DEVDB_GRPC_HOST";
const DEFAULT_DEVDB_HOST: &str = "http://10.200.24.105:6802";

pub async fn get_device_info(
device: &[String],
) -> Result<tonic::Response<proto::DeviceInfoReply>, tonic::Status> {
match DevDbClient::connect("http://10.200.24.105:6802/").await {
let host = env_var::get(DEVDB_HOST).into_str_or(DEFAULT_DEVDB_HOST);
match DevDbClient::connect(host).await {
Ok(mut client) => {
let req = proto::DeviceList {
device: device.to_vec(),
Expand Down
8 changes: 6 additions & 2 deletions src/g_rpc/dpm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::proto::{
SettingReply,
},
};
use crate::env_var;
use tokio::time::{timeout, Duration};
use tonic::transport::{Channel, Error};
use tracing::{error, info, instrument, warn};
Expand All @@ -15,13 +16,16 @@ type TonicStreamResult<T> =
Result<tonic::Response<tonic::Streaming<T>>, tonic::Status>;
type TonicQueryResult<T> = Result<T, tonic::Status>;

const DPM_HOST: &str = "DPM_GRPC_HOST";
const DEFAULT_DPM_HOST: &str = "http://dce07.fnal.gov:50051";

// Builds a sharable connection to the DPM pool. All instances will use the
// same connection.

pub async fn build_connection() -> Result<Connection, Error> {
const DPM: &str = "http://dce07.fnal.gov:50051/";
let host = env_var::get(DPM_HOST).into_str_or(DEFAULT_DPM_HOST);

Ok(Connection(DaqClient::connect(DPM).await?))
Ok(Connection(DaqClient::connect(host).await?))
}

#[instrument(skip(conn, jwt, devices))]
Expand Down
10 changes: 7 additions & 3 deletions src/g_rpc/tlg/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::env_var;
use proto::services::tlg_placement::{
tlg_placement_mutation_service_client::TlgPlacementMutationServiceClient,
tlg_placement_service_client::TlgPlacementServiceClient, TlgDevices,
Expand All @@ -13,20 +14,23 @@ pub mod proto {
}
}

const URL: &str = "http://10.200.24.116:9090/";
const TLG_HOST: &str = "TLG_GRPC_HOST";
const DEFAULT_TLG_HOST: &str = "http://10.200.24.116:9090";

// Local helper function to get a connection to the gRPC service.

async fn get_service_client(
) -> Result<TlgPlacementServiceClient<transport::Channel>, Status> {
TlgPlacementServiceClient::connect(URL)
let host = env_var::get(TLG_HOST).into_str_or(DEFAULT_TLG_HOST);
TlgPlacementServiceClient::connect(host)
.await
.map_err(|_| Status::unavailable("TLG service unavailable"))
}

async fn get_mutation_service_client(
) -> Result<TlgPlacementMutationServiceClient<transport::Channel>, Status> {
TlgPlacementMutationServiceClient::connect(URL)
let host = env_var::get(TLG_HOST).into_str_or(DEFAULT_TLG_HOST);
TlgPlacementMutationServiceClient::connect(host)
.await
.map_err(|_| Status::unavailable("TLG service unavailable"))
}
Expand Down
9 changes: 7 additions & 2 deletions src/g_rpc/wscan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ pub mod proto {
tonic::include_proto!("scanner");
}

const SCANNER_URL: &str = "http://unknown.fnal.gov:50051/";
use crate::env_var;

const WIRE_SCANNER_HOST: &str = "SCANNER_GRPC_HOST";
const DEFAULT_WIRE_SCANNER_HOST: &str = "http://unknown.fnal.gov:50051";

// Local helper function to get a connection to the gRPC service.

async fn get_client() -> Result<ScannerClient<transport::Channel>, Status> {
ScannerClient::connect(SCANNER_URL)
let host =
env_var::get(WIRE_SCANNER_HOST).into_str_or(DEFAULT_WIRE_SCANNER_HOST);
ScannerClient::connect(host)
.await
.map_err(|_| Status::unavailable("wire-scanner service unavailable"))
}
Expand Down
22 changes: 11 additions & 11 deletions src/graphql/alarms/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use async_graphql::{Context, Error, Object, Subscription};
use tokio_stream::wrappers::BroadcastStream;

use crate::env_var;
use crate::pubsub::{Snapshot, Subscriber};

const ALARMS_KAFKA_TOPIC: &str = "ACsys";
const ALARMS_KAFKA_TOPIC: &str = "ALARMS_KAFKA_TOPIC";
const DEFAULT_ALARMS_TOPIC: &str = "ACsys";
fn get_topic() -> String {
env_var::get(ALARMS_KAFKA_TOPIC).into_str_or(DEFAULT_ALARMS_TOPIC)
}

pub fn get_alarms_subscriber() -> Option<Subscriber> {
Subscriber::for_topic(String::from(ALARMS_KAFKA_TOPIC)).ok()
Subscriber::for_topic(get_topic()).ok()
}

#[derive(Default)]
Expand All @@ -15,7 +21,7 @@ impl AlarmsQueries {
async fn alarms_snapshot(
&self, _ctxt: &Context<'_>,
) -> Result<Vec<String>, Error> {
match Snapshot::for_topic(String::from(ALARMS_KAFKA_TOPIC)) {
match Snapshot::for_topic(get_topic()) {
Ok(snapshot) => Ok(snapshot.data),
Err(err) => Err(Error::new(format!("{}", err))),
}
Expand Down Expand Up @@ -50,7 +56,7 @@ mod tests {
#[tokio::test]
async fn get_alarms_snapshot_returns_err_when_bad_address() {
unsafe {
env::set_var("KAFKA_HOST_ADDR", "fake value");
env::set_var("KAFKA_HOST", "fake value");
}
let schema =
Schema::build(AlarmsQueries, EmptyMutation, AlarmsSubscriptions)
Expand All @@ -64,9 +70,6 @@ mod tests {
"#,
)
.await;
unsafe {
env::remove_var("KAFKA_HOST_ADDR");
}
assert_eq!(result.errors.len(), 1);
match result.errors.first() {
Some(err) => {
Expand All @@ -81,12 +84,9 @@ mod tests {
#[test]
fn get_alarms_subscriber_returns_none_when_bad_address() {
unsafe {
env::set_var("KAFKA_HOST_ADDR", "fake value");
env::set_var("KAFKA_HOST", "fake value");
}
assert!(get_alarms_subscriber().is_none());
unsafe {
env::remove_var("KAFKA_HOST_ADDR");
}
}

#[tokio::test]
Expand Down
18 changes: 11 additions & 7 deletions src/graphql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use axum::{
};
use tracing::{info, instrument};

use crate::env_var;
use crate::g_rpc::dpm::build_connection;

mod acsys;
Expand Down Expand Up @@ -285,18 +286,21 @@ async fn create_site() -> Router {
// configuration information from the submodules. All accesses are
// wrapped with CORS support from the `warp` crate.

const GQL_PORT: &str = "GRAPHQL_PORT";
#[cfg(not(debug_assertions))]
const DEFAULT_GQL_PORT: u16 = 8000;
#[cfg(debug_assertions)]
const DEFAULT_GQL_PORT: u16 = 8001;

pub async fn start_service() {
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};

// Define the binding address for the web service. The address is
// different between the operational and development versions.

#[cfg(not(debug_assertions))]
const BIND_ADDR: SocketAddr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8000));
#[cfg(debug_assertions)]
const BIND_ADDR: SocketAddr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), 8001));
let port = env_var::get(GQL_PORT).into_u16_or(DEFAULT_GQL_PORT);
let bind_addr: SocketAddr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port));

// Load TLS certificate information. If there's an error, we panic.

Expand All @@ -317,7 +321,7 @@ pub async fn start_service() {

// Start the server.

axum_server::tls_rustls::bind_rustls(BIND_ADDR, config)
axum_server::tls_rustls::bind_rustls(bind_addr, config)
.serve(app.into_make_service())
.await
.unwrap();
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use tracing::{info, Level};

mod env_var;
mod g_rpc;
mod graphql;
mod pubsub;
Expand Down
16 changes: 7 additions & 9 deletions src/pubsub/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::env_var;
use kafka::{
client::{FetchOffset, GroupOffsetStorage},
consumer::Consumer,
};
use std::{
env,
error::Error,
fmt::{self, Debug},
sync::Arc,
Expand Down Expand Up @@ -65,11 +65,11 @@ impl MessageJob {
}
}

const DEFAULT_KAFKA_ADDR: &str = "acsys-services.fnal.gov:9092";
const KAFKA_HOST: &str = "KAFKA_HOST";
const DEFAULT_KAFKA_HOST: &str = "acsys-services.fnal.gov:9092";
fn get_consumer(topic: String) -> Result<Consumer, PubSubError> {
let addr = env::var("KAFKA_HOST_ADDR")
.unwrap_or_else(|_| String::from(DEFAULT_KAFKA_ADDR));
Consumer::from_hosts(vec![addr])
let host = env_var::get(KAFKA_HOST).into_str_or(DEFAULT_KAFKA_HOST);
Consumer::from_hosts(vec![host])
.with_topic(topic)
.with_fallback_offset(FetchOffset::Earliest)
.with_offset_storage(Some(GroupOffsetStorage::Kafka))
Expand Down Expand Up @@ -171,6 +171,7 @@ impl std::error::Error for PubSubError {}
#[cfg(test)]
mod tests {
use super::*;
use std::env;

#[test]
fn pubsub_error_display() {
Expand All @@ -181,15 +182,12 @@ mod tests {
#[test]
fn error_on_bad_subscriber_host() {
unsafe {
env::set_var("KAFKA_HOST_ADDR", "bad_host");
env::set_var(KAFKA_HOST, "bad_host");
}
let result = Subscriber::for_topic(String::from("my_topic"));
let err = result
.expect_err("Expected the connection to fail, but it succeeded");
assert_eq!(CANNED_ERR_MESSAGE, format!("{}", err));
unsafe {
env::remove_var("KAFKA_HOST_ADDR");
}
}

#[test]
Expand Down