diff options
author | CNlucius <lukai1@huawei.com> | 2016-09-13 11:40:12 +0800 |
---|---|---|
committer | CNlucius <lukai1@huawei.com> | 2016-09-13 11:41:53 +0800 |
commit | b731e2f1dd0972409b136aebc7b463dd72c9cfad (patch) | |
tree | 5107d7d80c19ad8076c2c97c2b5ef8d1cf3ab903 /framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app | |
parent | ee93993458266114c29271a481ef9ce7ce621b2a (diff) |
ONOSFW-171
O/S-SFC-ONOS scenario documentation
Change-Id: I51ae1cf736ea24ab6680f8edca1b2bf5dd598365
Signed-off-by: CNlucius <lukai1@huawei.com>
Diffstat (limited to 'framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app')
2 files changed, 0 insertions, 529 deletions
diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java deleted file mode 100644 index fe4aa0be..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java +++ /dev/null @@ -1,509 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.onosproject.store.app; - -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import org.apache.felix.scr.annotations.Activate; -import org.apache.felix.scr.annotations.Component; -import org.apache.felix.scr.annotations.Deactivate; -import org.apache.felix.scr.annotations.Reference; -import org.apache.felix.scr.annotations.ReferenceCardinality; -import org.apache.felix.scr.annotations.Service; -import org.onlab.util.KryoNamespace; -import org.onosproject.app.ApplicationDescription; -import org.onosproject.app.ApplicationEvent; -import org.onosproject.app.ApplicationException; -import org.onosproject.app.ApplicationState; -import org.onosproject.app.ApplicationStore; -import org.onosproject.app.ApplicationStoreDelegate; -import org.onosproject.cluster.ClusterService; -import org.onosproject.cluster.ControllerNode; -import org.onosproject.common.app.ApplicationArchive; -import org.onosproject.core.Application; -import org.onosproject.core.ApplicationId; -import org.onosproject.core.ApplicationIdStore; -import org.onosproject.core.CoreService; -import org.onosproject.core.DefaultApplication; -import org.onosproject.security.Permission; -import org.onosproject.store.cluster.messaging.ClusterCommunicationService; -import org.onosproject.store.cluster.messaging.MessageSubject; -import org.onosproject.store.serializers.KryoNamespaces; -import org.onosproject.store.service.EventuallyConsistentMap; -import org.onosproject.store.service.EventuallyConsistentMapEvent; -import org.onosproject.store.service.EventuallyConsistentMapListener; -import org.onosproject.store.service.LogicalClockService; -import org.onosproject.store.service.MultiValuedTimestamp; -import org.onosproject.store.service.StorageException; -import org.onosproject.store.service.StorageService; -import org.slf4j.Logger; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Function; - -import static com.google.common.collect.Multimaps.newSetMultimap; -import static com.google.common.collect.Multimaps.synchronizedSetMultimap; -import static com.google.common.io.ByteStreams.toByteArray; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static org.onlab.util.Tools.groupedThreads; -import static org.onlab.util.Tools.randomDelay; -import static org.onosproject.app.ApplicationEvent.Type.*; -import static org.onosproject.store.app.GossipApplicationStore.InternalState.*; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT; -import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE; -import static org.slf4j.LoggerFactory.getLogger; - -/** - * Manages inventory of applications in a distributed data store that uses - * optimistic replication and gossip based anti-entropy techniques. - */ -@Component(immediate = true) -@Service -public class GossipApplicationStore extends ApplicationArchive - implements ApplicationStore { - - private final Logger log = getLogger(getClass()); - - private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request"); - - private static final int MAX_LOAD_RETRIES = 5; - private static final int RETRY_DELAY_MS = 2_000; - - private static final int FETCH_TIMEOUT_MS = 10_000; - - public enum InternalState { - INSTALLED, ACTIVATED, DEACTIVATED - } - - private ScheduledExecutorService executor; - private ExecutorService messageHandlingExecutor; - - private EventuallyConsistentMap<ApplicationId, Application> apps; - private EventuallyConsistentMap<Application, InternalState> states; - private EventuallyConsistentMap<Application, Set<Permission>> permissions; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterCommunicationService clusterCommunicator; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ClusterService clusterService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected StorageService storageService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected LogicalClockService clockService; - - @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) - protected ApplicationIdStore idStore; - - // Multimap to track which apps are required by others apps - // app -> { required-by, ... } - // Apps explicitly activated will be required by the CORE app - private final Multimap<ApplicationId, ApplicationId> requiredBy = - synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet)); - - private ApplicationId coreAppId; - - @Activate - public void activate() { - KryoNamespace.Builder serializer = KryoNamespace.newBuilder() - .register(KryoNamespaces.API) - .register(MultiValuedTimestamp.class) - .register(InternalState.class); - - executor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store")); - - messageHandlingExecutor = Executors.newSingleThreadExecutor( - groupedThreads("onos/store/app", "message-handler")); - - clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST, - bytes -> new String(bytes, Charsets.UTF_8), - name -> { - try { - return toByteArray(getApplicationInputStream(name)); - } catch (IOException e) { - throw new StorageException(e); - } - }, - Function.identity(), - messageHandlingExecutor); - - // FIXME: Consider consolidating into a single map. - - apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder() - .withName("apps") - .withSerializer(serializer) - .withTimestampProvider((k, v) -> clockService.getTimestamp()) - .build(); - - states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder() - .withName("app-states") - .withSerializer(serializer) - .withTimestampProvider((k, v) -> clockService.getTimestamp()) - .build(); - - states.addListener(new InternalAppStatesListener()); - - permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder() - .withName("app-permissions") - .withSerializer(serializer) - .withTimestampProvider((k, v) -> clockService.getTimestamp()) - .build(); - - coreAppId = getId(CoreService.CORE_APP_NAME); - log.info("Started"); - } - - /** - * Loads the application inventory from the disk and activates apps if - * they are marked to be active. - */ - private void loadFromDisk() { - getApplicationNames().forEach(appName -> { - Application app = loadFromDisk(appName); - if (app != null && isActive(app.id().name())) { - activate(app.id(), false); - // TODO Load app permissions - } - }); - } - - private Application loadFromDisk(String appName) { - for (int i = 0; i < MAX_LOAD_RETRIES; i++) { - try { - // Directly return if app already exists - ApplicationId appId = getId(appName); - if (appId != null) { - return getApplication(appId); - } - - ApplicationDescription appDesc = getApplicationDescription(appName); - boolean success = appDesc.requiredApps().stream() - .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null); - return success ? create(appDesc, false) : null; - } catch (Exception e) { - log.warn("Unable to load application {} from disk; retrying", appName); - randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon - } - } - return null; - } - - @Deactivate - public void deactivate() { - clusterCommunicator.removeSubscriber(APP_BITS_REQUEST); - messageHandlingExecutor.shutdown(); - executor.shutdown(); - apps.destroy(); - states.destroy(); - permissions.destroy(); - log.info("Stopped"); - } - - @Override - public void setDelegate(ApplicationStoreDelegate delegate) { - super.setDelegate(delegate); - loadFromDisk(); - } - - @Override - public Set<Application> getApplications() { - return ImmutableSet.copyOf(apps.values()); - } - - @Override - public ApplicationId getId(String name) { - return idStore.getAppId(name); - } - - @Override - public Application getApplication(ApplicationId appId) { - return apps.get(appId); - } - - @Override - public ApplicationState getState(ApplicationId appId) { - Application app = apps.get(appId); - InternalState s = app == null ? null : states.get(app); - return s == null ? null : s == ACTIVATED ? - ApplicationState.ACTIVE : ApplicationState.INSTALLED; - } - - @Override - public Application create(InputStream appDescStream) { - ApplicationDescription appDesc = saveApplication(appDescStream); - if (hasPrerequisites(appDesc)) { - return create(appDesc, true); - } - throw new ApplicationException("Missing dependencies for app " + appDesc.name()); - } - - private boolean hasPrerequisites(ApplicationDescription app) { - return !app.requiredApps().stream().map(n -> getId(n)) - .anyMatch(id -> id == null || getApplication(id) == null); - } - - private Application create(ApplicationDescription appDesc, boolean updateTime) { - Application app = registerApp(appDesc); - if (updateTime) { - updateTime(app.id().name()); - } - apps.put(app.id(), app); - states.put(app, INSTALLED); - return app; - } - - @Override - public void remove(ApplicationId appId) { - Application app = apps.get(appId); - if (app != null) { - uninstallDependentApps(app); - apps.remove(appId); - states.remove(app); - permissions.remove(app); - } - } - - // Uninstalls all apps that depend on the given app. - private void uninstallDependentApps(Application app) { - getApplications().stream() - .filter(a -> a.requiredApps().contains(app.id().name())) - .forEach(a -> remove(a.id())); - } - - @Override - public void activate(ApplicationId appId) { - activate(appId, coreAppId); - } - - private void activate(ApplicationId appId, ApplicationId forAppId) { - requiredBy.put(appId, forAppId); - activate(appId, true); - } - - - private void activate(ApplicationId appId, boolean updateTime) { - Application app = apps.get(appId); - if (app != null) { - if (updateTime) { - updateTime(appId.name()); - } - activateRequiredApps(app); - states.put(app, ACTIVATED); - } - } - - // Activates all apps required by this application. - private void activateRequiredApps(Application app) { - app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id())); - } - - @Override - public void deactivate(ApplicationId appId) { - deactivateDependentApps(getApplication(appId)); - deactivate(appId, coreAppId); - } - - private void deactivate(ApplicationId appId, ApplicationId forAppId) { - requiredBy.remove(appId, forAppId); - if (requiredBy.get(appId).isEmpty()) { - Application app = apps.get(appId); - if (app != null) { - updateTime(appId.name()); - states.put(app, DEACTIVATED); - deactivateRequiredApps(app); - } - } - } - - // Deactivates all apps that require this application. - private void deactivateDependentApps(Application app) { - getApplications().stream() - .filter(a -> states.get(a) == ACTIVATED) - .filter(a -> a.requiredApps().contains(app.id().name())) - .forEach(a -> deactivate(a.id())); - } - - // Deactivates all apps required by this application. - private void deactivateRequiredApps(Application app) { - app.requiredApps().stream().map(this::getId).map(this::getApplication) - .filter(a -> states.get(a) == ACTIVATED) - .forEach(a -> deactivate(a.id(), app.id())); - } - - @Override - public Set<Permission> getPermissions(ApplicationId appId) { - Application app = apps.get(appId); - return app != null ? permissions.get(app) : null; - } - - @Override - public void setPermissions(ApplicationId appId, Set<Permission> permissions) { - Application app = getApplication(appId); - if (app != null) { - this.permissions.put(app, permissions); - delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app)); - } - } - - /** - * Listener to application state distributed map changes. - */ - private final class InternalAppStatesListener - implements EventuallyConsistentMapListener<Application, InternalState> { - @Override - public void event(EventuallyConsistentMapEvent<Application, InternalState> event) { - // If we do not have a delegate, refuse to process any events entirely. - // This is to allow the anti-entropy to kick in and process the events - // perhaps a bit later, but with opportunity to notify delegate. - if (delegate == null) { - return; - } - - Application app = event.key(); - InternalState state = event.value(); - - if (event.type() == PUT) { - if (state == INSTALLED) { - fetchBitsIfNeeded(app); - delegate.notify(new ApplicationEvent(APP_INSTALLED, app)); - - } else if (state == ACTIVATED) { - installAppIfNeeded(app); - setActive(app.id().name()); - delegate.notify(new ApplicationEvent(APP_ACTIVATED, app)); - - } else if (state == DEACTIVATED) { - clearActive(app.id().name()); - delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app)); - } - } else if (event.type() == REMOVE) { - delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app)); - purgeApplication(app.id().name()); - } - } - } - - /** - * Determines if the application bits are available locally. - */ - private boolean appBitsAvailable(Application app) { - try { - ApplicationDescription appDesc = getApplicationDescription(app.id().name()); - return appDesc.version().equals(app.version()); - } catch (ApplicationException e) { - return false; - } - } - - /** - * Fetches the bits from the cluster peers if necessary. - */ - private void fetchBitsIfNeeded(Application app) { - if (!appBitsAvailable(app)) { - fetchBits(app); - } - } - - /** - * Installs the application if necessary from the application peers. - */ - private void installAppIfNeeded(Application app) { - if (!appBitsAvailable(app)) { - fetchBits(app); - delegate.notify(new ApplicationEvent(APP_INSTALLED, app)); - } - } - - /** - * Fetches the bits from the cluster peers. - */ - private void fetchBits(Application app) { - ControllerNode localNode = clusterService.getLocalNode(); - CountDownLatch latch = new CountDownLatch(1); - - // FIXME: send message with name & version to make sure we don't get served old bits - - log.info("Downloading bits for application {}", app.id().name()); - for (ControllerNode node : clusterService.getNodes()) { - if (latch.getCount() == 0) { - break; - } - if (node.equals(localNode)) { - continue; - } - clusterCommunicator.sendAndReceive(app.id().name(), - APP_BITS_REQUEST, - s -> s.getBytes(Charsets.UTF_8), - Function.identity(), - node.id()) - .whenCompleteAsync((bits, error) -> { - if (error == null && latch.getCount() > 0) { - saveApplication(new ByteArrayInputStream(bits)); - log.info("Downloaded bits for application {} from node {}", - app.id().name(), node.id()); - latch.countDown(); - } else if (error != null) { - log.warn("Unable to fetch bits for application {} from node {}", - app.id().name(), node.id()); - } - }, executor); - } - - try { - if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) { - log.warn("Unable to fetch bits for application {}", app.id().name()); - } - } catch (InterruptedException e) { - log.warn("Interrupted while fetching bits for application {}", app.id().name()); - } - } - - /** - * Prunes applications which are not in the map, but are on disk. - */ - private void pruneUninstalledApps() { - for (String name : getApplicationNames()) { - if (getApplication(getId(name)) == null) { - Application app = registerApp(getApplicationDescription(name)); - delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app)); - purgeApplication(app.id().name()); - } - } - } - - /** - * Produces a registered application from the supplied description. - */ - private Application registerApp(ApplicationDescription appDesc) { - ApplicationId appId = idStore.registerApplication(appDesc.name()); - return new DefaultApplication(appId, appDesc.version(), appDesc.description(), - appDesc.origin(), appDesc.role(), appDesc.permissions(), - appDesc.featuresRepo(), appDesc.features(), - appDesc.requiredApps()); - } -} diff --git a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java b/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java deleted file mode 100644 index b2a909ee..00000000 --- a/framework/src/onos/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2015 Open Networking Laboratory - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Implementation of distributed applications store. - */ -package org.onosproject.store.app; |