|
| 1 | + |
1 | 2 | /*
|
2 | 3 | * Copyright (C) 2015 Google Inc.
|
3 | 4 | *
|
|
16 | 17 |
|
17 | 18 | package io.k8s.cassandra;
|
18 | 19 |
|
19 |
| -import org.apache.cassandra.config.Config; |
20 |
| -import org.apache.cassandra.config.ConfigurationLoader; |
21 |
| -import org.apache.cassandra.config.YamlConfigurationLoader; |
22 |
| -import org.apache.cassandra.exceptions.ConfigurationException; |
| 20 | +import java.io.IOException; |
| 21 | +import java.net.InetAddress; |
| 22 | +import java.util.Collections; |
| 23 | +import java.util.List; |
| 24 | +import java.util.Map; |
| 25 | + |
23 | 26 | import org.apache.cassandra.locator.SeedProvider;
|
24 |
| -import org.apache.cassandra.locator.SimpleSeedProvider; |
25 |
| -import org.apache.cassandra.utils.FBUtilities; |
26 | 27 | import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
27 | 28 | import org.codehaus.jackson.map.ObjectMapper;
|
28 | 29 | import org.slf4j.Logger;
|
29 | 30 | import org.slf4j.LoggerFactory;
|
30 | 31 |
|
31 |
| -import javax.net.ssl.*; |
32 |
| -import java.io.IOException; |
33 |
| -import java.net.InetAddress; |
34 |
| -import java.net.URL; |
35 |
| -import java.net.UnknownHostException; |
36 |
| -import java.nio.file.Files; |
37 |
| -import java.nio.file.Paths; |
38 |
| -import java.security.SecureRandom; |
39 |
| -import java.security.cert.X509Certificate; |
40 |
| -import java.util.ArrayList; |
41 |
| -import java.util.Collections; |
42 |
| -import java.util.List; |
43 |
| -import java.util.Map; |
| 32 | +import com.sun.jna.Native; |
44 | 33 |
|
45 | 34 | /**
|
46 | 35 | * Self discovery {@link SeedProvider} that creates a list of Cassandra Seeds by
|
47 | 36 | * communicating with the Kubernetes API.
|
48 |
| - * <p>Various System Variable can be used to configure this provider: |
| 37 | + * <p> |
| 38 | + * Various System Variable can be used to configure this provider: |
49 | 39 | * <ul>
|
50 |
| - * <li>KUBERNETES_PORT_443_TCP_ADDR defaults to kubernetes.default.svc.cluster.local</li> |
51 |
| - * <li>KUBERNETES_PORT_443_TCP_PORT defaults to 443</li> |
52 |
| - * <li>CASSANDRA_SERVICE defaults to cassandra</li> |
53 |
| - * <li>POD_NAMESPACE defaults to 'default'</li> |
54 |
| - * <li>CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds</li> |
55 |
| - * <li>K8S_ACCOUNT_TOKEN defaults to the path for the default token</li> |
| 40 | + * <li>CASSANDRA_SERVICE defaults to cassandra</li> |
| 41 | + * <li>POD_NAMESPACE defaults to 'default'</li> |
| 42 | + * <li>CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds</li> |
56 | 43 | * </ul>
|
57 | 44 | */
|
58 | 45 | public class KubernetesSeedProvider implements SeedProvider {
|
59 | 46 |
|
60 |
| - private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class); |
61 |
| - |
62 |
| - /** |
63 |
| - * default seeds to fall back on |
64 |
| - */ |
65 |
| - private List<InetAddress> defaultSeeds; |
66 |
| - |
67 |
| - private TrustManager[] trustAll; |
68 |
| - |
69 |
| - private HostnameVerifier trustAllHosts; |
70 |
| - |
71 |
| - /** |
72 |
| - * Create new Seeds |
73 |
| - * @param params |
74 |
| - */ |
75 |
| - public KubernetesSeedProvider(Map<String, String> params) { |
76 |
| - |
77 |
| - // Create default seeds |
78 |
| - defaultSeeds = createDefaultSeeds(); |
79 |
| - |
80 |
| - // TODO: Load the CA cert when it is available on all platforms. |
81 |
| - trustAll = new TrustManager[] { |
82 |
| - new X509TrustManager() { |
83 |
| - public void checkServerTrusted(X509Certificate[] certs, String authType) {} |
84 |
| - public void checkClientTrusted(X509Certificate[] certs, String authType) {} |
85 |
| - public X509Certificate[] getAcceptedIssuers() { return null; } |
86 |
| - } |
87 |
| - }; |
88 |
| - |
89 |
| - trustAllHosts = new HostnameVerifier() { |
90 |
| - public boolean verify(String hostname, SSLSession session) { |
91 |
| - return true; |
92 |
| - } |
93 |
| - }; |
94 |
| - } |
95 |
| - |
96 |
| - /** |
97 |
| - * Call kubernetes API to collect a list of seed providers |
98 |
| - * @return list of seed providers |
99 |
| - */ |
100 |
| - public List<InetAddress> getSeeds() { |
101 |
| - |
102 |
| - String host = getEnvOrDefault("KUBERNETES_PORT_443_TCP_ADDR", "kubernetes.default.svc.cluster.local"); |
103 |
| - String port = getEnvOrDefault("KUBERNETES_PORT_443_TCP_PORT", "443"); |
104 |
| - String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra"); |
105 |
| - String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default"); |
106 |
| - String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace); |
107 |
| - String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8"); |
108 |
| - Integer seedSize = Integer.valueOf(seedSizeVar); |
109 |
| - String accountToken = getEnvOrDefault("K8S_ACCOUNT_TOKEN", "/var/run/secrets/kubernetes.io/serviceaccount/token"); |
110 |
| - |
111 |
| - List<InetAddress> seeds = new ArrayList<InetAddress>(); |
112 |
| - try { |
113 |
| - String token = getServiceAccountToken(accountToken); |
114 |
| - |
115 |
| - SSLContext ctx = SSLContext.getInstance("SSL"); |
116 |
| - ctx.init(null, trustAll, new SecureRandom()); |
117 |
| - |
118 |
| - String PROTO = "https://"; |
119 |
| - URL url = new URL(PROTO + host + ":" + port + path + serviceName); |
120 |
| - logger.info("Getting endpoints from " + url); |
121 |
| - HttpsURLConnection conn = (HttpsURLConnection)url.openConnection(); |
122 |
| - |
123 |
| - // TODO: Remove this once the CA cert is propagated everywhere, and replace |
124 |
| - // with loading the CA cert. |
125 |
| - conn.setHostnameVerifier(trustAllHosts); |
126 |
| - |
127 |
| - conn.setSSLSocketFactory(ctx.getSocketFactory()); |
128 |
| - conn.addRequestProperty("Authorization", "Bearer " + token); |
129 |
| - ObjectMapper mapper = new ObjectMapper(); |
130 |
| - Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class); |
131 |
| - |
132 |
| - if (endpoints != null) { |
133 |
| - // Here is a problem point, endpoints.subsets can be null in first node cases. |
134 |
| - if (endpoints.subsets != null && !endpoints.subsets.isEmpty()){ |
135 |
| - for (Subset subset : endpoints.subsets) { |
136 |
| - if (subset.addresses != null && !subset.addresses.isEmpty()) { |
137 |
| - for (Address address : subset.addresses) { |
138 |
| - seeds.add(InetAddress.getByName(address.ip)); |
139 |
| - |
140 |
| - if(seeds.size() >= seedSize) { |
141 |
| - logger.info("Available num endpoints: " + seeds.size()); |
142 |
| - return Collections.unmodifiableList(seeds); |
143 |
| - } |
144 |
| - } |
145 |
| - } |
146 |
| - } |
147 |
| - } |
148 |
| - logger.info("Available num endpoints: " + seeds.size()); |
149 |
| - } else { |
150 |
| - logger.warn("Endpoints are not available using default seeds in cassandra.yaml"); |
151 |
| - return Collections.unmodifiableList(defaultSeeds); |
152 |
| - } |
153 |
| - } catch (Exception ex) { |
154 |
| - logger.warn("Request to kubernetes apiserver failed, using default seeds in cassandra.yaml", ex); |
155 |
| - return Collections.unmodifiableList(defaultSeeds); |
156 |
| - } |
157 |
| - |
158 |
| - if (seeds.size() == 0) { |
159 |
| - // If we got nothing, we might be the first instance, in that case |
160 |
| - // fall back on the seeds that were passed in cassandra.yaml. |
161 |
| - logger.warn("Seeds are not available using default seeds in cassandra.yaml"); |
162 |
| - return Collections.unmodifiableList(defaultSeeds); |
163 |
| - } |
164 |
| - |
165 |
| - return Collections.unmodifiableList(seeds); |
166 |
| - } |
167 |
| - |
168 |
| - /** |
169 |
| - * Code taken from {@link SimpleSeedProvider}. This is used as a fall back |
170 |
| - * incase we don't find seeds |
171 |
| - * @return |
172 |
| - */ |
173 |
| - protected List<InetAddress> createDefaultSeeds() |
174 |
| - { |
175 |
| - Config conf; |
176 |
| - try { |
177 |
| - conf = loadConfig(); |
178 |
| - } |
179 |
| - catch (Exception e) { |
180 |
| - throw new AssertionError(e); |
181 |
| - } |
182 |
| - String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1); |
183 |
| - List<InetAddress> seeds = new ArrayList<InetAddress>(); |
184 |
| - for (String host : hosts) { |
185 |
| - try { |
186 |
| - seeds.add(InetAddress.getByName(host.trim())); |
187 |
| - } |
188 |
| - catch (UnknownHostException ex) { |
189 |
| - // not fatal... DD will bark if there end up being zero seeds. |
190 |
| - logger.warn("Seed provider couldn't lookup host {}", host); |
191 |
| - } |
192 |
| - } |
193 |
| - |
194 |
| - if(seeds.size() == 0) { |
195 |
| - try { |
196 |
| - seeds.add(InetAddress.getLocalHost()); |
197 |
| - } catch (UnknownHostException e) { |
198 |
| - logger.warn("Seed provider couldn't lookup localhost"); |
199 |
| - } |
200 |
| - } |
201 |
| - return Collections.unmodifiableList(seeds); |
202 |
| - } |
203 |
| - |
204 |
| - /** |
205 |
| - * Code taken from {@link SimpleSeedProvider} |
206 |
| - * @return |
207 |
| - */ |
208 |
| - protected static Config loadConfig() throws ConfigurationException |
209 |
| - { |
210 |
| - String loaderClass = System.getProperty("cassandra.config.loader"); |
211 |
| - ConfigurationLoader loader = loaderClass == null |
212 |
| - ? new YamlConfigurationLoader() |
213 |
| - : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading"); |
214 |
| - return loader.loadConfig(); |
215 |
| - } |
216 |
| - |
217 |
| - private static String getEnvOrDefault(String var, String def) { |
218 |
| - String val = System.getenv(var); |
219 |
| - if (val == null) { |
220 |
| - val = def; |
221 |
| - } |
222 |
| - return val; |
223 |
| - } |
224 |
| - |
225 |
| - private static String getServiceAccountToken(String file) { |
226 |
| - try { |
227 |
| - return new String(Files.readAllBytes(Paths.get(file))); |
228 |
| - } catch (IOException e) { |
229 |
| - logger.warn("unable to load service account token" + file); |
230 |
| - throw new RuntimeException("Unable to load services account token " + file); |
231 |
| - } |
232 |
| - } |
233 |
| - |
234 |
| - protected List<InetAddress> getDefaultSeeds() { |
235 |
| - return defaultSeeds; |
236 |
| - } |
237 |
| - |
238 |
| - @JsonIgnoreProperties(ignoreUnknown = true) |
239 |
| - static class Address { |
240 |
| - public String ip; |
241 |
| - } |
242 |
| - |
243 |
| - @JsonIgnoreProperties(ignoreUnknown = true) |
244 |
| - static class Subset { |
245 |
| - public List<Address> addresses; |
246 |
| - } |
247 |
| - |
248 |
| - @JsonIgnoreProperties(ignoreUnknown = true) |
249 |
| - static class Endpoints { |
250 |
| - public List<Subset> subsets; |
251 |
| - } |
| 47 | + private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class); |
| 48 | + |
| 49 | + |
| 50 | + /** |
| 51 | + * Create new seed provider |
| 52 | + * |
| 53 | + * @param params |
| 54 | + */ |
| 55 | + public KubernetesSeedProvider(Map<String, String> params) { |
| 56 | + } |
| 57 | + |
| 58 | + /** |
| 59 | + * Call Kubernetes API to collect a list of seed providers |
| 60 | + * |
| 61 | + * @return list of seed providers |
| 62 | + */ |
| 63 | + public List<InetAddress> getSeeds() { |
| 64 | + GoInterface go = (GoInterface) Native.loadLibrary("cassandra-seed.so", GoInterface.class); |
| 65 | + |
| 66 | + String service = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra"); |
| 67 | + String namespace = getEnvOrDefault("POD_NAMESPACE", "default"); |
| 68 | + |
| 69 | + String initialSeeds = getEnvOrDefault("CASSANDRA_SEEDS", ""); |
| 70 | + if (initialSeeds.equals("")) { |
| 71 | + initialSeeds = getEnvOrDefault("POD_IP", ""); |
| 72 | + } |
| 73 | + |
| 74 | + String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8"); |
| 75 | + Integer seedSize = Integer.valueOf(seedSizeVar); |
| 76 | + |
| 77 | + String data = go.GetEndpoints(namespace, service, initialSeeds); |
| 78 | + ObjectMapper mapper = new ObjectMapper(); |
| 79 | + |
| 80 | + try { |
| 81 | + Endpoints endpoints = mapper.readValue(data, Endpoints.class); |
| 82 | + logger.info("cassandra seeds: " + endpoints.ips.toString()); |
| 83 | + return Collections.unmodifiableList(endpoints.ips); |
| 84 | + } catch (IOException e) { |
| 85 | + // This should not happen |
| 86 | + logger.error("unexpected error building cassandra seeds: " + e.getMessage()); |
| 87 | + return Collections.emptyList(); |
| 88 | + } |
| 89 | + } |
| 90 | + |
| 91 | + private static String getEnvOrDefault(String var, String def) { |
| 92 | + String val = System.getenv(var); |
| 93 | + if (val == null) { |
| 94 | + val = def; |
| 95 | + } |
| 96 | + return val; |
| 97 | + } |
| 98 | + |
| 99 | + @JsonIgnoreProperties(ignoreUnknown = true) |
| 100 | + static class Endpoints { |
| 101 | + public List<InetAddress> ips; |
| 102 | + } |
252 | 103 | }
|
0 commit comments