Skip to content

Commit 65086cf

Browse files
committed
Adjust user memory via api
1 parent d8cf172 commit 65086cf

File tree

18 files changed

+390
-43
lines changed

18 files changed

+390
-43
lines changed

ansible/group_vars/all

+4
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ controller:
107107
authentication:
108108
spi: "{{ controller_authentication_spi | default('') }}"
109109
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
110+
username: "{{ controller_username | default('controller.user') }}"
111+
password: "{{ controller_password | default('controller.pass') }}"
110112
entitlement:
111113
spi: "{{ controller_entitlement_spi | default('') }}"
112114
protocol: "{{ controller_protocol | default('https') }}"
@@ -209,6 +211,8 @@ invoker:
209211
{% endif %}"
210212
extraEnv: "{{ invoker_extraEnv | default({}) }}"
211213
protocol: "{{ invoker_protocol | default('https') }}"
214+
username: "{{ invoker_username | default('invoker.user') }}"
215+
password: "{{ invoker_password | default('invoker.pass') }}"
212216
ssl:
213217
cn: "openwhisk-invokers"
214218
keyPrefix: "{{ __invoker_ssl_keyPrefix }}"

ansible/roles/controller/tasks/deploy.yml

+3
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@
203203
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
204204
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"
205205

206+
"CONFIG_whisk_controller_username": "{{ controller.username }}"
207+
"CONFIG_whisk_controller_password": "{{ controller.password }}"
208+
206209
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
207210
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
208211
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"

ansible/roles/invoker/tasks/deploy.yml

+2
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@
279279
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
280280
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
281281
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
282+
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
283+
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"
282284

283285
- name: extend invoker dns env
284286
set_fact:

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

+5
Original file line numberDiff line numberDiff line change
@@ -291,4 +291,9 @@ object ConfigKeys {
291291
val azBlob = "whisk.azure-blob"
292292

293293
val whiskClusterName = "whisk.cluster.name"
294+
295+
val whiskControllerUsername = "whisk.controller.username"
296+
val whiskControllerPassword = "whisk.controller.password"
297+
val whiskInvokerUsername = "whisk.invoker.username"
298+
val whiskInvokerPassword = "whisk.invoker.password"
294299
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

+39
Original file line numberDiff line numberDiff line change
@@ -457,3 +457,42 @@ object StatusData extends DefaultJsonProtocol {
457457
implicit val serdes =
458458
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
459459
}
460+
461+
case class UserMemoryMessage(userMemory: ByteSize) extends Message {
462+
override def serialize = UserMemoryMessage.serdes.write(this).compactPrint
463+
}
464+
465+
object UserMemoryMessage extends DefaultJsonProtocol {
466+
implicit val serdes = new RootJsonFormat[UserMemoryMessage] {
467+
override def write(message: UserMemoryMessage): JsValue = {
468+
JsObject("userMemory" -> JsString(message.userMemory.toString))
469+
}
470+
471+
override def read(json: JsValue): UserMemoryMessage = {
472+
val userMemory = fromField[String](json, "userMemory")
473+
new UserMemoryMessage(ByteSize.fromString(userMemory))
474+
}
475+
}
476+
477+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
478+
}
479+
480+
case class InvokerConfiguration(invoker: Int, memory: ByteSize)
481+
482+
object InvokerConfigurationProtocol extends DefaultJsonProtocol {
483+
implicit val serdes = new RootJsonFormat[ByteSize] {
484+
override def write(obj: ByteSize): JsValue = JsObject("memory" -> JsString(obj.toString))
485+
486+
override def read(json: JsValue): ByteSize = {
487+
json match {
488+
case JsString(memory) => ByteSize.fromString(memory)
489+
case _ => throw new DeserializationException("Could not deserialize ByteSize")
490+
}
491+
}
492+
}
493+
implicit val invokerConfigurationFormat = jsonFormat2(InvokerConfiguration)
494+
implicit val invokerConfigurationListJsonFormat = new RootJsonFormat[List[InvokerConfiguration]] {
495+
def read(value: JsValue) = value.convertTo[List[InvokerConfiguration]]
496+
def write(f: List[InvokerConfiguration]) = ???
497+
}
498+
}

core/controller/src/main/resources/application.conf

+4
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,8 @@ whisk{
117117
file-system : true
118118
dir-path : "/swagger-ui/"
119119
}
120+
controller {
121+
username: "controller.user"
122+
password: "controller.pass"
123+
}
120124
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

+40-4
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ import akka.actor.{ActorSystem, CoordinatedShutdown}
2222
import akka.event.Logging.InfoLevel
2323
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
2424
import akka.http.scaladsl.model.StatusCodes._
25-
import akka.http.scaladsl.model.Uri
25+
import akka.http.scaladsl.model.{StatusCodes, Uri}
26+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2627
import akka.http.scaladsl.server.Route
2728
import akka.stream.ActorMaterializer
2829
import kamon.Kamon
@@ -32,8 +33,8 @@ import spray.json.DefaultJsonProtocol._
3233
import spray.json._
3334
import org.apache.openwhisk.common.Https.HttpsConfig
3435
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
35-
import org.apache.openwhisk.core.WhiskConfig
36-
import org.apache.openwhisk.core.connector.MessagingProvider
36+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
37+
import org.apache.openwhisk.core.connector.{InvokerConfiguration, MessagingProvider, UserMemoryMessage}
3738
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3839
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
3940
import org.apache.openwhisk.core.entitlement._
@@ -97,7 +98,7 @@ class Controller(val instance: ControllerInstanceId,
9798
(pathEndOrSingleSlash & get) {
9899
complete(info)
99100
}
100-
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
101+
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configMemory
101102
}
102103

103104
// initialize datastores
@@ -176,6 +177,41 @@ class Controller(val instance: ControllerInstanceId,
176177
LogLimit.config,
177178
runtimes,
178179
List(apiV1.basepath()))
180+
181+
private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername)
182+
private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword)
183+
184+
/**
185+
* config user memory of ContainerPool
186+
*/
187+
import org.apache.openwhisk.core.connector.InvokerConfigurationProtocol._
188+
private val configMemory = {
189+
implicit val executionContext = actorSystem.dispatcher
190+
(path("config" / "memory") & post) {
191+
extractCredentials {
192+
case Some(BasicHttpCredentials(username, password)) =>
193+
if (username == controllerUsername && password == controllerPassword) {
194+
entity(as[String]) { memory =>
195+
val configMemoryList = memory.parseJson.convertTo[List[InvokerConfiguration]]
196+
configMemoryList.find(config => MemoryLimit.MIN_MEMORY.compare(config.memory) > 0) match {
197+
case Some(_) =>
198+
complete(StatusCodes.BadRequest, s"user memory can't be less than ${MemoryLimit.MIN_MEMORY}")
199+
case None =>
200+
configMemoryList.foreach { config =>
201+
val invoker = config.invoker
202+
val userMemoryMessage = UserMemoryMessage(config.memory)
203+
loadBalancer.sendChangeRequestToInvoker(userMemoryMessage, invoker)
204+
}
205+
complete(StatusCodes.Accepted)
206+
}
207+
}
208+
} else {
209+
complete(StatusCodes.Unauthorized, "username or password is wrong")
210+
}
211+
case _ => complete(StatusCodes.Unauthorized)
212+
}
213+
}
214+
}
179215
}
180216

181217
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

+8
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ trait LoadBalancer {
6060
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
6161
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
6262

63+
/**
64+
* send user memory to invokers
65+
*
66+
* @param userMemory
67+
* @param targetInvokers
68+
*/
69+
def sendChangeRequestToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {}
70+
6371
/**
6472
* Returns a message indicating the health of the containers and/or container pool in general.
6573
*

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

+16
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
4343
import scala.annotation.tailrec
4444
import scala.concurrent.Future
4545
import scala.concurrent.duration.FiniteDuration
46+
import scala.util.{Failure, Success}
4647

4748
/**
4849
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -316,6 +317,21 @@ class ShardingContainerPoolBalancer(
316317
}
317318
}
318319

320+
/** send user memory to invokers */
321+
override def sendChangeRequestToInvoker(userMemoryMessage: UserMemoryMessage, targetInvoker: Int): Unit = {
322+
schedulingState.invokers.filter { invoker =>
323+
invoker.id.instance == targetInvoker
324+
} foreach { invokerHealth =>
325+
val topic = s"invoker${invokerHealth.id.toInt}"
326+
messageProducer.send(topic, userMemoryMessage).andThen {
327+
case Success(_) =>
328+
logging.info(this, s"successfully posted user memory configuration to topic $topic")
329+
case Failure(_) =>
330+
logging.error(this, s"failed posted user memory configuration to topic $topic")
331+
}
332+
}
333+
}
334+
319335
override val invokerPool =
320336
invokerPoolFactory.createInvokerPool(
321337
actorSystem,

core/invoker/src/main/resources/application.conf

+2
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ whisk {
171171
}
172172

173173
invoker {
174+
username: "invoker.user"
175+
password: "invoker.pass"
174176
protocol: http
175177
}
176178
runtime.delete.timeout = "30 seconds"

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

+13-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.openwhisk.core.containerpool
1919

2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
2121
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
22-
import org.apache.openwhisk.core.connector.MessageFeed
22+
import org.apache.openwhisk.core.connector.{MessageFeed, UserMemoryMessage}
2323
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
2424
import org.apache.openwhisk.core.entity._
2525
import org.apache.openwhisk.core.entity.size._
@@ -31,6 +31,8 @@ import scala.util.{Random, Try}
3131

3232
case class ColdStartKey(kind: String, memory: ByteSize)
3333

34+
object UserMemoryQuery
35+
3436
case object EmitMetrics
3537

3638
case object AdjustPrewarmedContainer
@@ -68,6 +70,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
6870
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
6971
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
7072
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
73+
var latestUserMemory = poolConfig.userMemory
7174
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
7275
// buffered here to keep order of computation.
7376
// Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -209,7 +212,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
209212
s"Rescheduling Run message, too many message in the pool, " +
210213
s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " +
211214
s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " +
212-
s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " +
215+
s"maxContainersMemory ${latestUserMemory.toMB} MB, " +
213216
s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " +
214217
s"needed memory: ${r.action.limits.memory.megabytes} MB, " +
215218
s"waiting messages: ${runBuffer.size}")(r.msg.transid)
@@ -297,6 +300,13 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
297300
case RescheduleJob =>
298301
freePool = freePool - sender()
299302
busyPool = busyPool - sender()
303+
case userMemoryMessage: UserMemoryMessage =>
304+
logging.info(
305+
this,
306+
s"user memory is reconfigured from ${latestUserMemory.toString} to ${userMemoryMessage.userMemory.toString}")
307+
latestUserMemory = userMemoryMessage.userMemory
308+
case UserMemoryQuery =>
309+
sender() ! latestUserMemory.toString
300310
case EmitMetrics =>
301311
emitMetrics()
302312

@@ -444,7 +454,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
444454
def hasPoolSpaceFor[A](pool: Map[A, ContainerData],
445455
prewarmStartingPool: Map[A, (String, ByteSize)],
446456
memory: ByteSize): Boolean = {
447-
memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= poolConfig.userMemory.toMB
457+
memoryConsumptionOf(pool) + prewarmStartingPool.map(_._2._2.toMB).sum + memory.toMB <= latestUserMemory.toMB
448458
}
449459

450460
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.model.StatusCodes
22+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
23+
import akka.http.scaladsl.server.Route
24+
import org.apache.openwhisk.common.{Logging, TransactionId}
25+
import org.apache.openwhisk.core.ConfigKeys
26+
import org.apache.openwhisk.http.BasicRasService
27+
import org.apache.openwhisk.http.ErrorResponse.terminate
28+
import pureconfig._
29+
import spray.json.PrettyPrinter
30+
31+
import scala.concurrent.ExecutionContext
32+
33+
/**
34+
* Implements web server to handle certain REST API calls.
35+
*/
36+
class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext,
37+
val actorSystem: ActorSystem,
38+
val logger: Logging)
39+
extends BasicRasService {
40+
41+
val invokerUsername = loadConfigOrThrow[String](ConfigKeys.whiskInvokerUsername)
42+
val invokerPassword = loadConfigOrThrow[String](ConfigKeys.whiskInvokerPassword)
43+
44+
override def routes(implicit transid: TransactionId): Route = {
45+
super.routes ~ extractCredentials {
46+
case Some(BasicHttpCredentials(username, password))
47+
if username == invokerUsername && password == invokerPassword =>
48+
(path("config" / "memory") & get) {
49+
invoker.getUserMemory()
50+
}
51+
case _ =>
52+
implicit val jsonPrettyResponsePrinter = PrettyPrinter
53+
terminate(StatusCodes.Unauthorized)
54+
}
55+
}
56+
}
57+
58+
object DefaultInvokerServer extends InvokerServerProvider {
59+
override def instance(
60+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
61+
new DefaultInvokerServer(invoker)
62+
}

core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala

+4-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker
1919

2020
import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
22+
import akka.http.scaladsl.server.Route
2223
import akka.stream.ActorMaterializer
2324
import com.typesafe.config.ConfigValueFactory
2425
import kamon.Kamon
@@ -217,7 +218,9 @@ trait InvokerProvider extends Spi {
217218
}
218219

219220
// this trait can be used to add common implementation
220-
trait InvokerCore {}
221+
trait InvokerCore {
222+
def getUserMemory(): Route
223+
}
221224

222225
/**
223226
* An Spi for providing RestAPI implementation for invoker.
@@ -227,9 +230,3 @@ trait InvokerServerProvider extends Spi {
227230
def instance(
228231
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService
229232
}
230-
231-
object DefaultInvokerServer extends InvokerServerProvider {
232-
override def instance(
233-
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
234-
new BasicRasService {}
235-
}

0 commit comments

Comments
 (0)