Skip to content
7 changes: 5 additions & 2 deletions core/graphman/src/commands/deployment/pause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub async fn load_active_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<ActiveDeployment, PauseDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -76,7 +79,7 @@ pub async fn pause_active_deployment(
notification_sender: Arc<NotificationSender>,
active_deployment: ActiveDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get().await?;
let primary_conn = primary_pool.get_permitted().await?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn.pause_subgraph(&active_deployment.site).await?;
Expand Down
15 changes: 12 additions & 3 deletions core/graphman/src/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ impl Deployment {
&self,
primary_pool: ConnectionPool,
) -> Result<Option<NodeId>, GraphmanError> {
let primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);
let node = catalog_conn
.assigned_node(&self.site)
Expand Down Expand Up @@ -58,7 +61,10 @@ pub async fn load_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<Deployment, ReassignDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -87,7 +93,10 @@ pub async fn reassign_deployment(
node: &NodeId,
curr_node: Option<NodeId>,
) -> Result<ReassignResult, ReassignDeploymentError> {
let primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);
let changes: Vec<AssignmentChange> = match &curr_node {
Some(curr) => {
Expand Down
7 changes: 5 additions & 2 deletions core/graphman/src/commands/deployment/resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub async fn load_paused_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<PausedDeployment, ResumeDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -76,7 +79,7 @@ pub async fn resume_paused_deployment(
notification_sender: Arc<NotificationSender>,
paused_deployment: PausedDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get().await?;
let primary_conn = primary_pool.get_permitted().await?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn
Expand Down
7 changes: 5 additions & 2 deletions core/graphman/src/commands/deployment/unassign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ pub async fn load_assigned_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<AssignedDeployment, UnassignDeploymentError> {
let mut primary_conn = primary_pool.get().await.map_err(GraphmanError::from)?;
let mut primary_conn = primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
Expand Down Expand Up @@ -73,7 +76,7 @@ pub async fn unassign_deployment(
notification_sender: Arc<NotificationSender>,
deployment: AssignedDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get().await?;
let primary_conn = primary_pool.get_permitted().await?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn.unassign_subgraph(&deployment.site).await?;
Expand Down
8 changes: 4 additions & 4 deletions graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,11 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
// Back off exponentially whenever we encounter a connection error or a stream with bad data
let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45));

// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

try_stream! {
// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

loop {
let endpoint = client.firehose_endpoint().await?;
let logger = logger.new(o!("deployment" => deployment.clone(), "provider" => endpoint.provider.to_string()));
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use futures03::future::BoxFuture;
use graph_derive::CheapClone;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use slog::{error, Logger};
use slog::Logger;
use std::{
any::Any,
collections::{HashMap, HashSet},
Expand Down
8 changes: 4 additions & 4 deletions graph/src/blockchain/substreams_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ fn stream_blocks<C: Blockchain, F: BlockStreamMapper<C>>(
// Back off exponentially whenever we encounter a connection error or a stream with bad data
let mut backoff = ExponentialBackoff::new(Duration::from_millis(500), Duration::from_secs(45));

// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

let mut log_data = SubstreamsLogData::new();

try_stream! {
// This attribute is needed because `try_stream!` seems to break detection of `skip_backoff` assignments
#[allow(unused_assignments)]
let mut skip_backoff = false;

if !modules.modules.iter().any(|m| module_name.eq(&m.name)) {
Err(BlockStreamError::Fatal(format!(
"module `{}` not found",
Expand Down
1 change: 0 additions & 1 deletion graph/src/components/network_provider/provider_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::Duration;

use derivative::Derivative;
use itertools::Itertools;
use slog::error;
use slog::info;
use slog::warn;
use slog::Logger;
Expand Down
32 changes: 30 additions & 2 deletions node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl Shard {
fn validate(&mut self, name: &str) -> Result<()> {
ShardName::new(name.to_string()).map_err(|e| anyhow!(e))?;

self.connection = shellexpand::env(&self.connection)?.into_owned();
self.expand_connection()?;

if matches!(self.pool_size, PoolSize::None) {
return Err(anyhow!("missing pool size definition for shard `{}`", name));
Expand Down Expand Up @@ -301,6 +301,25 @@ impl Shard {
replicas,
})
}

fn expand_connection(&mut self) -> Result<()> {
let mut url = Url::parse(shellexpand::env(&self.connection)?.as_ref())?;
// Put the PGAPPNAME into the URL since tokio-postgres ignores this
// environment variable
if let Some(app_name) = std::env::var("PGAPPNAME").ok() {
let query = match url.query() {
Some(query) => {
format!("{query}&application_name={app_name}")
}
None => {
format!("application_name={app_name}")
}
};
url.set_query(Some(&query));
}
self.connection = url.to_string();
Ok(())
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -1944,6 +1963,10 @@ mod tests {
let query = NodeId::new("query_node_1").unwrap();
let other = NodeId::new("other_node_1").unwrap();

let appname = std::env::var("PGAPPNAME").ok();
unsafe {
std::env::set_var("PGAPPNAME", "config-test");
}
let shard = {
let mut shard = toml::from_str::<Shard>(
r#"
Expand All @@ -1961,10 +1984,15 @@ fdw_pool_size = [
shard.validate("index_node_1").unwrap();
shard
};
if let Some(appname) = appname {
unsafe {
std::env::set_var("PGAPPNAME", appname);
}
}

assert_eq!(
shard.connection,
"postgresql://postgres:postgres@postgres/graph"
"postgresql://postgres:postgres@postgres/graph?application_name=config-test"
);

assert_eq!(shard.pool_size.size_for(&index, "ashard").unwrap(), 20);
Expand Down
6 changes: 3 additions & 3 deletions node/src/manager/commands/assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub async fn unassign(
) -> Result<(), Error> {
let locator = search.locate_unique(&primary).await?;

let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = catalog::Connection::new(pconn);

let site = conn
Expand All @@ -38,7 +38,7 @@ pub async fn reassign(
let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?;
let locator = search.locate_unique(&primary).await?;

let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = catalog::Connection::new(pconn);

let site = conn
Expand Down Expand Up @@ -81,7 +81,7 @@ pub async fn pause_or_resume(
locator: &DeploymentLocator,
should_pause: bool,
) -> Result<(), Error> {
let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = catalog::Connection::new(pconn);

let site = conn
Expand Down
5 changes: 3 additions & 2 deletions node/src/manager/commands/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ pub async fn info(

pub async fn remove(primary: ConnectionPool, store: BlockStore, name: String) -> Result<(), Error> {
let sites = {
let mut conn =
graph_store_postgres::command_support::catalog::Connection::new(primary.get().await?);
let mut conn = graph_store_postgres::command_support::catalog::Connection::new(
primary.get_permitted().await?,
);
conn.find_sites_for_network(&name).await?
};

Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub async fn run(
if !start_block && (block_hash.is_none() || block_number.is_none()) {
bail!("--block-hash and --block-number must be specified when --start-block is not set");
}
let pconn = primary.get().await?;
let pconn = primary.get_permitted().await?;
let mut conn = store_catalog::Connection::new(pconn);

let subgraph_store = store.subgraph_store();
Expand Down
2 changes: 1 addition & 1 deletion node/src/manager/commands/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn site_and_conn(
let primary_pool = pools.get(&*PRIMARY_SHARD).unwrap();
let locator = search.locate_unique(primary_pool).await?;

let pconn = primary_pool.get().await?;
let pconn = primary_pool.get_permitted().await?;
let mut conn = store_catalog::Connection::new(pconn);

let site = conn
Expand Down
6 changes: 5 additions & 1 deletion server/graphman/src/resolvers/deployment_mutation/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use crate::resolvers::context::GraphmanContext;
use graphman::GraphmanError;

pub async fn run(ctx: &GraphmanContext, name: &String) -> Result<()> {
let primary_pool = ctx.primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_pool = ctx
.primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_pool);

let name = match SubgraphName::new(name) {
Expand Down
6 changes: 5 additions & 1 deletion server/graphman/src/resolvers/deployment_mutation/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use crate::resolvers::context::GraphmanContext;
use graphman::GraphmanError;

pub async fn run(ctx: &GraphmanContext, name: &String) -> Result<()> {
let primary_pool = ctx.primary_pool.get().await.map_err(GraphmanError::from)?;
let primary_pool = ctx
.primary_pool
.get_permitted()
.await
.map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_pool);

let name = match SubgraphName::new(name) {
Expand Down
8 changes: 4 additions & 4 deletions store/postgres/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub mod primary {
}

pub(super) async fn drop_chain(pool: &ConnectionPool, name: &str) -> Result<(), StoreError> {
let mut conn = pool.get().await?;
let mut conn = pool.get_permitted().await?;

delete(chains::table.filter(chains::name.eq(name)))
.execute(&mut conn)
Expand Down Expand Up @@ -427,7 +427,7 @@ impl BlockStore {
let cached = match self.chain_head_cache.get(shard.as_str()) {
Some(cached) => cached,
None => {
let mut conn = match pool.get().await {
let mut conn = match pool.get_permitted().await {
Ok(conn) => conn,
Err(StoreError::DatabaseUnavailable) => continue,
Err(e) => return Err(e),
Expand Down Expand Up @@ -568,7 +568,7 @@ impl BlockStore {
use crate::primary::db_version as dbv;

let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap();
let mut conn = primary_pool.get().await?;
let mut conn = primary_pool.get_permitted().await?;
let version: i64 = dbv::table
.select(dbv::version)
.get_result(&mut conn)
Expand Down Expand Up @@ -667,7 +667,7 @@ impl ChainIdStore for BlockStore {

// Update the master copy in the primary
let primary_pool = self.pools.get(&*PRIMARY_SHARD).unwrap();
let mut conn = primary_pool.get().await?;
let mut conn = primary_pool.get_permitted().await?;

diesel::update(c::table.filter(c::name.eq(chain_name.as_str())))
.set((
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/chain_head_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl ChainHeadUpdateSender {
"head_block_number": number
});

let mut conn = self.pool.get().await?;
let mut conn = self.pool.get_permitted().await?;
self.sender
.notify(
&mut conn,
Expand Down
Loading